Skip to content

Commit

Permalink
multi: support fiat conversion for assets
Browse files Browse the repository at this point in the history
This enables the conversion of assets value to fiat equivalent.
- Modify wallets page to display fiat value for asset if asset exchange rate is available.
- Modify settings page to display available exchange rate source.
- Modify order verify form to display value for base and quote assets.
- Add new enpoint to support enabling and disabling of exchange rate source.
- Add tests to cover exchange rate fetching.
  • Loading branch information
ukane-philemon committed Jun 28, 2022
1 parent 5fb9657 commit 03fabed
Show file tree
Hide file tree
Showing 40 changed files with 1,188 additions and 42 deletions.
2 changes: 1 addition & 1 deletion client/asset/doge/doge.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ var (
}
// WalletInfo defines some general information about a Dogecoin wallet.
WalletInfo = &asset.WalletInfo{
Name: "Doge",
Name: "Dogecoin",
Version: version,
UnitInfo: dexdoge.UnitInfo,
AvailableWallets: []*asset.WalletDefinition{{
Expand Down
247 changes: 247 additions & 0 deletions client/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -1133,6 +1133,9 @@ type Core struct {

sentCommitsMtx sync.Mutex
sentCommits map[order.Commitment]chan struct{}

ratesMtx sync.Mutex
exchangeRateSources map[string]*commonSource
}

// New is the constructor for a new Core.
Expand Down Expand Up @@ -1225,6 +1228,8 @@ func New(cfg *Config) (*Core, error) {
locale: locale,
localePrinter: message.NewPrinter(lang),
seedGenerationTime: seedGenerationTime,

exchangeRateSources: make(map[string]*commonSource),
}

// Populate the initial user data. User won't include any DEX info yet, as
Expand Down Expand Up @@ -1257,6 +1262,34 @@ func (c *Core) Run(ctx context.Context) {
c.latencyQ.Run(ctx)
}()

// Retrieve disabled exchange rate sources from database.
disabledSources, err := c.db.DisabledRateSources()
if err != nil {
c.log.Errorf("Unable to retrieve disabled exchange rate source: %v", err)
}

// Construct an exchange rate source.
for token, rateFetcher := range exchangeRateFetchers {
var isDisabled bool
for _, v := range disabledSources {
if token == v {
isDisabled = true
break
}
}
if !isDisabled {
c.exchangeRateSources[token] = newcommonSource(c.log, rateFetcher)
}
}

// Start goroutine for exchange rate fetcher's if we have at least one
// exchange rate source enabled.
if len(c.exchangeRateSources) > 0 {
c.fetchExchangeRates()
} else {
c.log.Debugf("no exchange rate source was initialized")
}

c.wg.Wait() // block here until all goroutines except DB complete

// Stop the DB after dexConnections and other goroutines are done.
Expand Down Expand Up @@ -1705,6 +1738,8 @@ func (c *Core) User() *User {
Exchanges: c.Exchanges(),
Initialized: c.IsInitialized(),
SeedGenerationTime: c.seedGenerationTime,
FiatRates: c.fetchAllFiatRates(),
DisableConversion: c.isConversionDisabled(),
}
}

Expand Down Expand Up @@ -7446,3 +7481,215 @@ func (c *Core) findActiveOrder(oid order.OrderID) (*trackedTrade, error) {
}
return nil, fmt.Errorf("could not find active order with order id: %s", oid)
}

// fetchExchangeRates starts the exchange rate source goroutine and schedules
// refresh cycles.
func (core *Core) fetchExchangeRates() {
core.wg.Add(1)
go func() {
defer core.wg.Done()
tick := time.NewTimer(0) // starts rate fetching immediately.
for {
select {
case <-tick.C:
if core.isConversionDisabled() {
tick.Stop()
return
}
core.refreshExchangeRate()
case <-core.ctx.Done():
tick.Stop()
return
}
tick = core.nextRateSourceTick()
}
}()
}

// nextRateSourceTick checks the rate source' last request, and
// calculates when the next cycle should run.
func (core *Core) nextRateSourceTick() *time.Timer {
core.ratesMtx.Lock()
defer core.ratesMtx.Unlock()

minTick := 10 * time.Second
tOldest := time.Now()
for _, rateSource := range core.exchangeRateSources {
t := rateSource.lastTry()
if t.Before(tOldest) {
tOldest = t
}
}
tilNext := fiatRateRequestInterval - time.Since(tOldest)
if tilNext < minTick {
tilNext = minTick
}
return time.NewTimer(tilNext)
}

// ExchangeRateSources returns a list of exchange rate sources and their
// individual status.
func (core *Core) ExchangeRateSources() map[string]bool {
core.ratesMtx.Lock()
defer core.ratesMtx.Unlock()

rateSources := make(map[string]bool, len(exchangeRateFetchers))
for token := range exchangeRateFetchers {
rateSources[token] = core.exchangeRateSources[token] != nil
}
return rateSources
}

// fetchAllFiatRates returns exchange rate information for all supported assets
// that have a wallet. It also checks if exchange rates are expired and does
// some clean-up.
func (core *Core) fetchAllFiatRates() map[uint32]float64 {
core.ratesMtx.Lock()
defer core.ratesMtx.Unlock()
supportedAssets := core.SupportedAssets()
fiatRatesMap := make(map[uint32]float64, len(supportedAssets))
for assetID := range supportedAssets {
var rateSum float64
var sources int32
for token, source := range core.exchangeRateSources {
// Remove exchange rate source with expired exchange rate data.
if source.isExpired(fiatRateDataExpiry) {
delete(core.exchangeRateSources, token)
continue
}
rateInfo, found := source.assetRate(assetID)
if found && time.Since(rateInfo.lastUpdate) < fiatRateDataExpiry {
sources++
rateSum += rateInfo.rate
}
}
if rateSum != 0 {
fiatRatesMap[assetID] = rateSum / float64(sources) // get average rate.
}
}

// Save disabled exchange rate source to database.
core.saveDisabledRateSources()

return fiatRatesMap
}

// isConversionDisabled checks if fiat rate fetch for assets is disabled.
// This is when either no exchange rate source is enabled or rates are
// not up-to-date. Individual exchange rate source can be disabled without
// disabling rate fetching, as long as there is at least one exchange
// rate source.
func (core *Core) isConversionDisabled() bool {
core.ratesMtx.Lock()
defer core.ratesMtx.Unlock()
return len(core.exchangeRateSources) == 0
}

// ToggleRateSourceStatus toggles an exchange rate source status. If
// disable is true, the exchange rate source is disabled, otherwise the rate
// source is enabled.
func (core *Core) ToggleRateSourceStatus(source string, disable bool) error {
if disable {
return core.disableRateSource(source)
}
return core.enableRateSource(source)
}

// enableRateSource enables an exchange rate source.
func (core *Core) enableRateSource(source string) error {
core.ratesMtx.Lock()
defer core.ratesMtx.Unlock()

// Check if it's an invalid rate source or it is already enabled.
rateFetcher, found := exchangeRateFetchers[source]
if !found {
return errors.New("cannot enable unknown exchange rate source")
} else if core.exchangeRateSources[source] != nil {
return nil // already enabled.
}

// Build exchange rate source.
core.exchangeRateSources[source] = newcommonSource(core.log, rateFetcher)

// If this is our first exchange rate source, start exchange rate fetcher
// gorountine, else fetch rates.
if len(core.exchangeRateSources) == 1 {
core.fetchExchangeRates()
} else {
go func() {
ctx, cancel := context.WithTimeout(core.ctx, 4*time.Second)
defer cancel()
core.exchangeRateSources[source].refreshRates(ctx, core.assetMap())
}()
}

// Update disabled exchange rate source.
core.saveDisabledRateSources()

core.log.Infof("Enabled %s to fetch exchange rates.", source)
return nil
}

// disableRateSource disables an exchange rate source.
func (core *Core) disableRateSource(source string) error {
core.ratesMtx.Lock()
defer core.ratesMtx.Unlock()

// Check if it's an invalid exchange rate source or it is already disabled.
_, found := exchangeRateFetchers[source]
if !found {
return errors.New("cannot disable unknown exchange rate source")
} else if core.exchangeRateSources[source] == nil {
return nil // already disabled.
}

// Remove exchange rate source.
delete(core.exchangeRateSources, source)

// Save disabled exchange rate sources to database.
core.saveDisabledRateSources()

core.log.Infof("Disabled %s from fetching exchange rates.", source)
return nil
}

// saveDisabledRateSources save disabled exchange rate sources to database.
// use under ratesMtx lock.
func (core *Core) saveDisabledRateSources() {
var disabled []string
for token := range exchangeRateFetchers {
if core.exchangeRateSources[token] == nil {
disabled = append(disabled, token)
}
}
err := core.db.SaveDisabledRateSources(disabled)
if err != nil {
core.log.Errorf("Unable to save disabled exchange rate source to database: %v", err)
}
}

// refreshExchangeRate refreshes the exchange rates for exchange rate sources
// whose values have not been updated since fiatRateRequestInterval.
func (core *Core) refreshExchangeRate() {
ctx, cancel := context.WithTimeout(core.ctx, 4*time.Second)
defer cancel()

var wg sync.WaitGroup
core.ratesMtx.Lock()
for _, source := range core.exchangeRateSources {
if time.Since(source.lastTry()) > fiatRateRequestInterval {
wg.Add(1)
go func(source *commonSource) {
defer wg.Done()
source.refreshRates(ctx, core.assetMap())
}(source)
}
}
core.ratesMtx.Unlock()
wg.Wait()

fiatRatesMap := core.fetchAllFiatRates()
if len(fiatRatesMap) != 0 {
core.notify(newFiatRatesUpdate(fiatRatesMap))
}
}
Loading

0 comments on commit 03fabed

Please sign in to comment.