Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi: support fiat conversion for assets #1600

Merged
merged 2 commits into from
Jul 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/asset/doge/doge.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ var (
}
// WalletInfo defines some general information about a Dogecoin wallet.
WalletInfo = &asset.WalletInfo{
Name: "Doge",
Name: "Dogecoin",
Version: version,
UnitInfo: dexdoge.UnitInfo,
AvailableWallets: []*asset.WalletDefinition{{
Expand Down
253 changes: 253 additions & 0 deletions client/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -1135,6 +1135,12 @@ type Core struct {

sentCommitsMtx sync.Mutex
sentCommits map[order.Commitment]chan struct{}

ratesMtx sync.RWMutex
fiatRateSources map[string]*commonRateSource
// stopFiatRateFetching will be used to shutdown fetchFiatExchangeRates
// goroutine when all rate sources have been disabled.
stopFiatRateFetching context.CancelFunc
}

// New is the constructor for a new Core.
Expand Down Expand Up @@ -1227,6 +1233,8 @@ func New(cfg *Config) (*Core, error) {
locale: locale,
localePrinter: message.NewPrinter(lang),
seedGenerationTime: seedGenerationTime,

fiatRateSources: make(map[string]*commonRateSource),
}

// Populate the initial user data. User won't include any DEX info yet, as
Expand Down Expand Up @@ -1259,6 +1267,36 @@ func (c *Core) Run(ctx context.Context) {
c.latencyQ.Run(ctx)
}()

// Skip rate fetch setup if on simnet. Rate fetching maybe enabled if
// desired.
if c.cfg.Net != dex.Simnet {
c.ratesMtx.Lock()
// Retrieve disabled fiat rate sources from database.
disabledSources, err := c.db.DisabledRateSources()
if err != nil {
c.log.Errorf("Unable to retrieve disabled fiat rate source: %v", err)
}

// Construct enabled fiat rate sources.
fetchers:
for token, rateFetcher := range fiatRateFetchers {
for _, v := range disabledSources {
if token == v {
continue fetchers
}
}
c.fiatRateSources[token] = newCommonRateSource(rateFetcher)
}

// Start goroutine for fiat rate fetcher's if we have at least one source.
if len(c.fiatRateSources) != 0 {
c.fetchFiatExchangeRates()
} else {
c.log.Debug("no fiat rate source initialized")
}
c.ratesMtx.Unlock()
}

c.wg.Wait() // block here until all goroutines except DB complete

// Stop the DB after dexConnections and other goroutines are done.
Expand Down Expand Up @@ -1707,6 +1745,7 @@ func (c *Core) User() *User {
Exchanges: c.Exchanges(),
Initialized: c.IsInitialized(),
SeedGenerationTime: c.seedGenerationTime,
FiatRates: c.fiatConversions(),
}
}

Expand Down Expand Up @@ -7723,3 +7762,217 @@ func (c *Core) findActiveOrder(oid order.OrderID) (*trackedTrade, error) {
}
return nil, fmt.Errorf("could not find active order with order id: %s", oid)
}

// fetchFiatExchangeRates starts the fiat rate fetcher goroutine and schedules
// refresh cycles. Use under ratesMtx lock.
func (c *Core) fetchFiatExchangeRates() {
if c.stopFiatRateFetching != nil {
c.log.Debug("Fiat exchange rate fetching is already enabled")
return
}
ctx, cancel := context.WithCancel(c.ctx)
c.stopFiatRateFetching = cancel

c.log.Debug("starting fiat rate fetching")

c.wg.Add(1)
go func() {
defer c.wg.Done()
tick := time.NewTimer(fiatRateRequestInterval)
defer tick.Stop()
for {

c.refreshFiatRates(ctx)

select {
case <-ctx.Done():
return
case <-tick.C:
}
}
}()
}

// refreshFiatRates refreshes the fiat rates for rate sources whose values have
// not been updated since fiatRateRequestInterval. It also checks if fiat rates
// are expired and does some clean-up.
func (c *Core) refreshFiatRates(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, 4*time.Second)
defer cancel()

var wg sync.WaitGroup
supportedAssets := c.SupportedAssets()
c.ratesMtx.RLock()
for _, source := range c.fiatRateSources {
wg.Add(1)
go func(source *commonRateSource) {
defer wg.Done()
source.refreshRates(ctx, c.log, supportedAssets)
}(source)
}
c.ratesMtx.RUnlock()
wg.Wait()

// Remove expired rate source if any.
c.removeExpiredRateSources()

fiatRatesMap := c.fiatConversions()
if len(fiatRatesMap) != 0 {
c.notify(newFiatRatesUpdate(fiatRatesMap))
}
}

// FiatRateSources returns a list of fiat rate sources and their individual
// status.
func (c *Core) FiatRateSources() map[string]bool {
c.ratesMtx.RLock()
defer c.ratesMtx.RUnlock()
rateSources := make(map[string]bool, len(fiatRateFetchers))
for token := range fiatRateFetchers {
rateSources[token] = c.fiatRateSources[token] != nil
}
return rateSources
}

// fiatConversions returns fiat rate for all supported assets that have a
// wallet.
func (c *Core) fiatConversions() map[uint32]float64 {
supportedAssets := asset.Assets()

c.ratesMtx.RLock()
defer c.ratesMtx.RUnlock()
fiatRatesMap := make(map[uint32]float64, len(supportedAssets))
for assetID := range supportedAssets {
var rateSum float64
var sources int
for _, source := range c.fiatRateSources {
rateInfo := source.assetRate(assetID)
if rateInfo != nil && time.Since(rateInfo.lastUpdate) < fiatRateDataExpiry {
sources++
rateSum += rateInfo.rate
}
}
if rateSum != 0 {
fiatRatesMap[assetID] = rateSum / float64(sources) // get average rate.
}
}
return fiatRatesMap
}

// ToggleRateSourceStatus toggles a fiat rate source status. If disable is true,
// the fiat rate source is disabled, otherwise the rate source is enabled.
func (c *Core) ToggleRateSourceStatus(source string, disable bool) error {
if disable {
return c.disableRateSource(source)
}
return c.enableRateSource(source)
}

// enableRateSource enables a fiat rate source.
func (c *Core) enableRateSource(source string) error {
// Check if it's an invalid rate source or it is already enabled.
rateFetcher, found := fiatRateFetchers[source]
if !found {
return errors.New("cannot enable unknown fiat rate source")
}

c.ratesMtx.Lock()
defer c.ratesMtx.Unlock()
if c.fiatRateSources[source] != nil {
return nil // already enabled.
}

// Build fiat rate source.
rateSource := newCommonRateSource(rateFetcher)
c.fiatRateSources[source] = rateSource

// If this is our first fiat rate source, start fiat rate fetcher goroutine,
// else fetch rates.
if len(c.fiatRateSources) == 1 {
c.fetchFiatExchangeRates()
ukane-philemon marked this conversation as resolved.
Show resolved Hide resolved
} else {
go func() {
supportedAssets := c.SupportedAssets() // not with ratesMtx locked!
ctx, cancel := context.WithTimeout(c.ctx, 4*time.Second)
defer cancel()
rateSource.refreshRates(ctx, c.log, supportedAssets)
}()
}

// Update disabled fiat rate source.
c.saveDisabledRateSources()

c.log.Infof("Enabled %s to fetch fiat rates.", source)
return nil
}

// disableRateSource disables a fiat rate source.
func (c *Core) disableRateSource(source string) error {
// Check if it's an invalid fiat rate source or it is already
// disabled.
_, found := fiatRateFetchers[source]
if !found {
return errors.New("cannot disable unknown fiat rate source")
}

c.ratesMtx.Lock()
defer c.ratesMtx.Unlock()

if c.fiatRateSources[source] == nil {
return nil // already disabled.
}

// Remove fiat rate source.
delete(c.fiatRateSources, source)

// Save disabled fiat rate sources to database.
c.saveDisabledRateSources()

c.log.Infof("Disabled %s from fetching fiat rates.", source)
return nil
}

// removeExpiredRateSources disables expired fiat rate source.
func (c *Core) removeExpiredRateSources() {
c.ratesMtx.Lock()
defer c.ratesMtx.Unlock()

// Remove fiat rate source with expired exchange rate data.
var disabledSources []string
for token, source := range c.fiatRateSources {
if source.isExpired(fiatRateDataExpiry) {
delete(c.fiatRateSources, token)
disabledSources = append(disabledSources, token)
}
}

// Ensure disabled fiat rate fetchers are saved to database.
if len(disabledSources) > 0 {
c.saveDisabledRateSources()
c.log.Warnf("Expired rate source(s) has been disabled: %v", strings.Join(disabledSources, ", "))
}
}

// saveDisabledRateSources saves disabled fiat rate sources to database and
// shuts down rate fetching if there are no exchange rate source. Use under
// ratesMtx lock.
func (c *Core) saveDisabledRateSources() {
var disabled []string
for token := range fiatRateFetchers {
if c.fiatRateSources[token] == nil {
disabled = append(disabled, token)
}
}

// Shutdown rate fetching if there are no exchange rate source.
if len(c.fiatRateSources) == 0 && c.stopFiatRateFetching != nil {
c.stopFiatRateFetching()
ukane-philemon marked this conversation as resolved.
Show resolved Hide resolved
c.stopFiatRateFetching = nil
c.log.Debug("shutting down rate fetching")
}

err := c.db.SaveDisabledRateSources(disabled)
if err != nil {
c.log.Errorf("Unable to save disabled fiat rate source to database: %v", err)
}
}
Loading