Skip to content

Commit

Permalink
refactor: Remove unnecessary instances of keepReconnecting (#1442)
Browse files Browse the repository at this point in the history
* Remove unnecessary instances of keepReconnecting

* Log error in reconnect instead of returning error

* Remove unused ping method

* Remove redundant error check

* Restore ping in Kraken provider

* Update CHANGELOG.md

Co-authored-by: Adam Wozniak <29418299+adamewozniak@users.noreply.github.com>
(cherry picked from commit 77fa5a6)

# Conflicts:
#	price-feeder/CHANGELOG.md
  • Loading branch information
rbajollari authored and mergify[bot] committed Oct 3, 2022
1 parent ae66523 commit 17faad1
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 75 deletions.
5 changes: 5 additions & 0 deletions price-feeder/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ Ref: https://keepachangelog.com/en/1.0.0/
- [#1175](https://github.com/umee-network/umee/pull/1175) Add type ProviderName.
- [#1255](https://github.com/umee-network/umee/pull/1255) Move TickerPrice and CandlePrice to types package
- [#1374](https://github.com/umee-network/umee/pull/1374) Add standard for telemetry metrics.
<<<<<<< HEAD
=======
- [#1431](https://github.com/umee-network/umee/pull/1431) Convert floats to sdk decimal using helper functions in all providers.
- [#1442](https://github.com/umee-network/umee/pull/1442) Remove unnecessary method in recconection logic.
>>>>>>> 77fa5a6 (refactor: Remove unnecessary instances of keepReconnecting (#1442))
### Features

Expand Down
42 changes: 6 additions & 36 deletions price-feeder/oracle/provider/binance.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,65 +329,35 @@ func (p *BinanceProvider) handleWebSocketMsgs(ctx context.Context) {
p.messageReceived(messageType, bz)

case <-reconnectTicker.C:
if err := p.disconnect(); err != nil {
p.logger.Err(err).Msg("error disconnecting")
}
if err := p.reconnect(); err != nil {
p.logger.Err(err).Msg("error reconnecting")
p.keepReconnecting()
}
}
}
}

// disconnect disconnects the existing websocket connection.
func (p *BinanceProvider) disconnect() error {
err := p.wsClient.Close()
if err != nil {
return types.ErrProviderConnection.Wrapf("error closing Binance websocket %v", err)
}
return nil
}

// reconnect closes the last WS connection then create a new one and subscribe to
// all subscribed pairs in the ticker and candle pais. A single connection to
// stream.binance.com is only valid for 24 hours; expect to be disconnected at the
// 24 hour mark. The websocket server will send a ping frame every 3 minutes. If
// the websocket server does not receive a pong frame back from the connection
// within a 10 minute period, the connection will be disconnected.
func (p *BinanceProvider) reconnect() error {
err := p.wsClient.Close()
if err != nil {
p.logger.Err(err).Msg("error closing binance websocket")
}

p.logger.Debug().Msg("reconnecting websocket")
wsConn, resp, err := websocket.DefaultDialer.Dial(p.wsURL.String(), nil)
defer resp.Body.Close()
if err != nil {
return fmt.Errorf("error reconnect to binance websocket: %w", err)
}
p.wsClient = wsConn

currencyPairs := p.subscribedPairsToSlice()

telemetryWebsocketReconnect(ProviderBinance)
return p.subscribeChannels(currencyPairs...)
}

// keepReconnecting keeps trying to reconnect if an error occurs in reconnect.
func (p *BinanceProvider) keepReconnecting() {
reconnectTicker := time.NewTicker(defaultReconnectTime)
defer reconnectTicker.Stop()
connectionTries := 1

for time := range reconnectTicker.C {
if err := p.reconnect(); err != nil {
p.logger.Err(err).Msgf("attempted to reconnect %d times at %s", connectionTries, time.String())
connectionTries++
continue
}

if connectionTries > maxReconnectionTries {
p.logger.Warn().Msgf("failed to reconnect %d times", connectionTries)
}
return
}
return p.subscribeChannels(p.subscribedPairsToSlice()...)
}

// setSubscribedPairs sets N currency pairs to the map of subscribed pairs.
Expand Down
47 changes: 8 additions & 39 deletions price-feeder/oracle/provider/kraken.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,15 +261,13 @@ func (p *KrakenProvider) handleWebSocketMsgs(ctx context.Context) {
if err != nil {
if websocket.IsCloseError(err, websocket.CloseAbnormalClosure) {
p.logger.Err(err).Msg("WebSocket closed unexpectedly")
p.keepReconnecting()
continue
}

// if some error occurs continue to try to read the next message.
p.logger.Err(err).Msg("could not read message")
if err := p.ping(); err != nil {
p.logger.Err(err).Msg("failed to send ping")
p.keepReconnecting()
}
continue
}
Expand All @@ -281,12 +279,8 @@ func (p *KrakenProvider) handleWebSocketMsgs(ctx context.Context) {
p.messageReceived(messageType, bz)

case <-reconnectTicker.C:
if err := p.disconnect(); err != nil {
p.logger.Err(err).Msg("error disconnecting")
}
if err := p.reconnect(); err != nil {
p.logger.Err(err).Msg("attempted to reconnect")
p.keepReconnecting()
p.logger.Err(err).Msg("error reconnecting")
}
}
}
Expand Down Expand Up @@ -463,50 +457,23 @@ func (p *KrakenProvider) messageReceivedCandle(bz []byte) error {
return nil
}

// disconnect disconnects the existing websocket connection.
func (p *KrakenProvider) disconnect() error {
// reconnect closes the last WS connection then create a new one.
func (p *KrakenProvider) reconnect() error {
err := p.wsClient.Close()
if err != nil {
return types.ErrProviderConnection.Wrapf("error closing Kraken websocket %v", err)
p.logger.Err(err).Msg("error closing Kraken websocket")
}
return nil
}

// reconnect creates a new websocket connection.
func (p *KrakenProvider) reconnect() error {
p.logger.Debug().Msg("trying to reconnect")

wsConn, resp, err := websocket.DefaultDialer.Dial(p.wsURL.String(), nil)
defer resp.Body.Close()
if err != nil {
return fmt.Errorf("error connecting to Kraken websocket: %w", err)
}
p.wsClient = wsConn

currencyPairs := p.subscribedPairsToSlice()

telemetryWebsocketReconnect(ProviderKraken)
return p.subscribeChannels(currencyPairs...)
}

// keepReconnecting keeps trying to reconnect if an error occurs in recconnect.
func (p *KrakenProvider) keepReconnecting() {
reconnectTicker := time.NewTicker(defaultReconnectTime)
defer reconnectTicker.Stop()
connectionTries := 1

for time := range reconnectTicker.C {
if err := p.reconnect(); err != nil {
p.logger.Err(err).Msgf("attempted to reconnect %d times at %s", connectionTries, time.String())
connectionTries++
continue
}

if connectionTries > maxReconnectionTries {
p.logger.Warn().Msgf("failed to reconnect %d times", connectionTries)
}
return
}
return p.subscribeChannels(p.subscribedPairsToSlice()...)
}

// messageReceivedSubscriptionStatus handle the subscription status message
Expand Down Expand Up @@ -543,7 +510,9 @@ func (p *KrakenProvider) messageReceivedSystemStatus(bz []byte) {
return
}

p.keepReconnecting()
if err := p.reconnect(); err != nil {
p.logger.Err(err).Msg("error reconnecting")
}
}

// setTickerPair sets an ticker to the map thread safe by the mutex.
Expand Down

0 comments on commit 17faad1

Please sign in to comment.