Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add SubscribeTickers func to providers #592

Merged
merged 17 commits into from
Mar 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions price-feeder/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
- [#551](https://github.com/umee-network/umee/pull/551) Update Binance provider to use WebSocket.
- [#569](https://github.com/umee-network/umee/pull/569) Update Huobi provider to use WebSocket.
- [#580](https://github.com/umee-network/umee/pull/580) Update Kraken provider to use WebSocket.
- [#592](https://github.com/umee-network/umee/pull/592) Add subscribe ticker function to the following providers: Binance, Huobi, Kraken, and Okx.

### Bug Fixes

Expand Down
149 changes: 89 additions & 60 deletions price-feeder/oracle/provider/binance.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,37 +31,38 @@ type (
wsURL url.URL
wsClient *websocket.Conn
logger zerolog.Logger
mu sync.Mutex
tickers map[string]BinanceTicker // Symbol => BinanceTicker
candles map[string]BinanceCandle // Symbol => BinanceCandle
subscribedPairs []types.CurrencyPair
mtx sync.Mutex
tickers map[string]BinanceTicker // Symbol => BinanceTicker
candles map[string]BinanceCandle // Symbol => BinanceCandle
subscribedPairs map[string]types.CurrencyPair // Symbol => types.CurrencyPair
}

// BinanceTicker ticker price response
// https://pkg.go.dev/encoding/json#Unmarshal
// Unmarshal matches incoming object keys to the keys
// used by Marshal (either the struct field name or its tag),
// preferring an exact match but also accepting a case-insensitive match
// C is not used, but it avoids to implement specific UnmarshalJSON
// BinanceTicker ticker price response. https://pkg.go.dev/encoding/json#Unmarshal
// Unmarshal matches incoming object keys to the keys used by Marshal (either the
// struct field name or its tag), preferring an exact match but also accepting a
// case-insensitive match. C field which is Statistics close time is not used, but
// it avoids to implement specific UnmarshalJSON.
BinanceTicker struct {
Symbol string `json:"s"` // Symbol ex.: BTCUSDT
LastPrice string `json:"c"` // Last price ex.: 0.0025
Volume string `json:"v"` // Total traded base asset volume ex.: 1000
C uint64 `json:"C"` // Statistics close time
}

// BinanceCandleMetadata candle metadata used to compute tvwap price.
BinanceCandleMetadata struct {
Close string `json:"c"` // Price at close
TimeStamp int64 `json:"T"` // Close time in unix epoch ex.: 1645756200000
Volume string `json:"v"` // Volume during period
}

// BinanceCandle candle binance websocket channel "kline_1m" response.
BinanceCandle struct {
Symbol string `json:"s"` // Symbol ex.: BTCUSDT
Metadata BinanceCandleMetadata `json:"k"` // Metadata for candle
}

// BinanceSubscribeMsg Msg to subscribe all the tickers channels
// BinanceSubscribeMsg Msg to subscribe all the tickers channels.
BinanceSubscriptionMsg struct {
Method string `json:"method"` // SUBSCRIBE/UNSUBSCRIBE
Params []string `json:"params"` // streams to subscribe ex.: usdtatom@ticker
Expand All @@ -87,13 +88,10 @@ func NewBinanceProvider(ctx context.Context, logger zerolog.Logger, pairs ...typ
logger: logger.With().Str("provider", "binance").Logger(),
tickers: map[string]BinanceTicker{},
candles: map[string]BinanceCandle{},
subscribedPairs: pairs,
subscribedPairs: map[string]types.CurrencyPair{},
}

if err := provider.subscribeTickers(pairs...); err != nil {
return nil, err
}
if err := provider.subscribeCandles(pairs...); err != nil {
if err := provider.SubscribeTickers(pairs...); err != nil {
return nil, err
}

Expand All @@ -102,6 +100,26 @@ func NewBinanceProvider(ctx context.Context, logger zerolog.Logger, pairs ...typ
return provider, nil
}

// SubscribeTickers subscribe all currency pairs into ticker and candle channels.
func (p *BinanceProvider) SubscribeTickers(cps ...types.CurrencyPair) error {
pairs := make([]string, len(cps)*2)

iterator := 0
for _, cp := range cps {
pairs[iterator] = currencyPairToBinanceTickerPair(cp)
iterator++
pairs[iterator] = currencyPairToBinanceCandlePair(cp)
iterator++
}
Comment on lines +108 to +113
Copy link
Member

@toteki toteki Mar 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is cps guaranteed to be an alternating list of Ticker, Candle, Ticker, Candle, etc?

Copy link
Contributor Author

@RafilxTenfen RafilxTenfen Mar 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the moment, yes

  • maybe after tvwap implemented we could only use candles

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oooo I didn't realize this. I'm not really a fan of that approach. It's too brittle IMO. Could we remove that assumption? Maybe just pass two args in instead?

Copy link
Collaborator

@adamewozniak adamewozniak Mar 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After tvwap gets merged can we put these back as separate functions?

Candle & Ticker pairs should generally be considered separate, especially since candles can not always be used immediately (you can't calculate tvwap if the only candle you have is one that closed at that exact moment) - sorry I didn't realize this before. Would make this less brittle too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, these pairs are used only to subscribe to those channels. We could have 2 lists of subscribed pairs and separate them one for each type (Ticker, Candle).

The logic to subscribe in both of those channels was the same. So, in my idea made sense to only iterate through cps ...types.CurrencyPair once

@adamewozniak @toteki @alexanderbez

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would rather iterate twice and keep the code clean and legible 🙏

These aren't big lists anyway


if err := p.subscribePairs(pairs...); err != nil {
return err
}

p.setSubscribedPairs(cps...)
return nil
}

// GetTickerPrices returns the tickerPrices based on the provided pairs.
func (p *BinanceProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[string]TickerPrice, error) {
tickerPrices := make(map[string]TickerPrice, len(pairs))
Expand Down Expand Up @@ -150,58 +168,36 @@ func (p *BinanceProvider) messageReceived(messageType int, bz []byte) {
if err := json.Unmarshal(bz, &tickerResp); err != nil {
p.logger.Debug().Err(err).Msg("could not unmarshal ticker response")
}
if err := json.Unmarshal(bz, &candleResp); err != nil {
p.logger.Debug().Err(err).Msg("could not unmarshal candle response")
}

if len(tickerResp.LastPrice) != 0 {
p.setTickerPair(tickerResp)
return
}

if err := json.Unmarshal(bz, &candleResp); err != nil {
p.logger.Debug().Err(err).Msg("could not unmarshal candle response")
return
}
if len(candleResp.Metadata.Close) != 0 {
p.setCandlePair(candleResp)
}
}

func (p *BinanceProvider) setTickerPair(ticker BinanceTicker) {
p.mu.Lock()
defer p.mu.Unlock()
p.mtx.Lock()
defer p.mtx.Unlock()
p.tickers[ticker.Symbol] = ticker
}

func (p *BinanceProvider) setCandlePair(candle BinanceCandle) {
p.mu.Lock()
defer p.mu.Unlock()
p.mtx.Lock()
defer p.mtx.Unlock()
p.candles[candle.Symbol] = candle
}

func (ticker BinanceTicker) toTickerPrice() (TickerPrice, error) {
return newTickerPrice("Binance", ticker.Symbol, ticker.LastPrice, ticker.Volume)
}

// subscribeTickers subscribe to all currency pairs
func (p *BinanceProvider) subscribeTickers(cps ...types.CurrencyPair) error {
params := make([]string, len(cps))

for i, cp := range cps {
params[i] = strings.ToLower(cp.String() + "@ticker")
}

subsMsg := newBinanceSubscriptionMsg(params...)
return p.wsClient.WriteJSON(subsMsg)
}

// subscribeCandles subscribe to all candles
func (p *BinanceProvider) subscribeCandles(cps ...types.CurrencyPair) error {
params := make([]string, len(cps))

for i, cp := range cps {
params[i] = strings.ToLower(cp.String() + "@kline_1m")
}

subsMsg := newBinanceSubscriptionMsg(params...)
return p.wsClient.WriteJSON(subsMsg)
}

func (p *BinanceProvider) handleWebSocketMsgs(ctx context.Context) {
reconnectTicker := time.NewTicker(defaultMaxConnectionTime)
defer reconnectTicker.Stop()
Expand All @@ -213,7 +209,7 @@ func (p *BinanceProvider) handleWebSocketMsgs(ctx context.Context) {
case <-time.After(defaultReadNewWSMessage):
messageType, bz, err := p.wsClient.ReadMessage()
if err != nil {
// if some error occurs continue to try to read the next message
// if some error occurs continue to try to read the next message.
p.logger.Err(err).Msg("could not read message")
continue
}
Expand All @@ -233,13 +229,12 @@ func (p *BinanceProvider) handleWebSocketMsgs(ctx context.Context) {
}
}

// reconnect closes the last WS connection and create a new one
// A single connection to stream.binance.com is only valid for 24 hours;
// expect to be disconnected at the 24 hour mark
// The websocket server will send a ping frame every 3 minutes.
// If the websocket server does not receive a pong frame back from
// the connection within a 10 minute period, the connection will be disconnected.
// Unsolicited pong frames are allowed.
// reconnect closes the last WS connection then create a new one and subscribe to
// all subscribed pairs in the ticker and candle pais. A single connection to
// stream.binance.com is only valid for 24 hours; expect to be disconnected at the
// 24 hour mark. The websocket server will send a ping frame every 3 minutes. If
// the websocket server does not receive a pong frame back from the connection
// within a 10 minute period, the connection will be disconnected.
func (p *BinanceProvider) reconnect() error {
p.wsClient.Close()

Expand All @@ -250,10 +245,16 @@ func (p *BinanceProvider) reconnect() error {
}
p.wsClient = wsConn

if err := p.subscribeCandles(p.subscribedPairs...); err != nil {
return err
pairs := make([]string, len(p.subscribedPairs)*2)
iterator := 0
for _, cp := range p.subscribedPairs {
pairs[iterator] = currencyPairToBinanceTickerPair(cp)
iterator++
pairs[iterator] = currencyPairToBinanceCandlePair(cp)
iterator++
alexanderbez marked this conversation as resolved.
Show resolved Hide resolved
}
return p.subscribeTickers(p.subscribedPairs...)

return p.subscribePairs(pairs...)
}

// keepReconnecting keeps trying to reconnect if an error occurs in reconnect.
Expand All @@ -276,7 +277,35 @@ func (p *BinanceProvider) keepReconnecting() {
}
}

// newBinanceSubscriptionMsg returns a new subscription Msg
// setSubscribedPairs sets N currency pairs to the map of subscribed pairs.
func (p *BinanceProvider) setSubscribedPairs(cps ...types.CurrencyPair) {
p.mtx.Lock()
defer p.mtx.Unlock()

for _, cp := range cps {
p.subscribedPairs[cp.String()] = cp
}
}

// subscribePairs write the subscription msg to the provider.
func (p *BinanceProvider) subscribePairs(pairs ...string) error {
subsMsg := newBinanceSubscriptionMsg(pairs...)
return p.wsClient.WriteJSON(subsMsg)
}

// currencyPairToBinanceTickerPair receives a currency pair and return binance
// ticker symbol atomusdt@ticker.
func currencyPairToBinanceTickerPair(cp types.CurrencyPair) string {
return strings.ToLower(cp.String() + "@ticker")
}

// currencyPairToBinanceCandlePair receives a currency pair and return binance
// candle symbol atomusdt@kline_1m.
func currencyPairToBinanceCandlePair(cp types.CurrencyPair) string {
return strings.ToLower(cp.String() + "@kline_1m")
}

// newBinanceSubscriptionMsg returns a new subscription Msg.
func newBinanceSubscriptionMsg(params ...string) BinanceSubscriptionMsg {
return BinanceSubscriptionMsg{
Method: "SUBSCRIBE",
Expand Down
6 changes: 6 additions & 0 deletions price-feeder/oracle/provider/binance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,9 @@ func TestBinanceProvider_GetTickerPrices(t *testing.T) {
require.Nil(t, prices)
})
}

func TestBinanceCurrencyPairToBinancePair(t *testing.T) {
cp := types.CurrencyPair{Base: "ATOM", Quote: "USDT"}
binanceSymbol := currencyPairToBinanceTickerPair(cp)
require.Equal(t, binanceSymbol, "atomusdt@ticker")
}
Loading