Skip to content
This repository has been archived by the owner on Jun 8, 2020. It is now read-only.

support Bitflyer streaming #27

Merged
merged 18 commits into from
Nov 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

<modules>
<module>xchange-stream-core</module>
<module>service-pubnub</module>
<module>service-pusher</module>
<module>service-netty</module>
<module>service-wamp</module>
Expand All @@ -21,6 +22,7 @@
<module>xchange-bitfinex</module>
<module>xchange-bitmex</module>
<module>xchange-poloniex2</module>
<module>xchange-bitflyer</module>
</modules>

<name>xchange-stream</name>
Expand Down
26 changes: 26 additions & 0 deletions service-pubnub/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
<version>4.2.4-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>service-pubnub</artifactId>

<dependencies>
<dependency>
<groupId>com.pubnub</groupId>
<artifactId>pubnub-gson</artifactId>
<version>4.14.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.8.6</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package info.bitrich.xchangestream.service.pubnub;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.pubnub.api.PNConfiguration;
import com.pubnub.api.PubNub;
import com.pubnub.api.callbacks.SubscribeCallback;
import com.pubnub.api.enums.PNStatusCategory;
import com.pubnub.api.models.consumer.PNStatus;
import com.pubnub.api.models.consumer.pubsub.PNMessageResult;
import com.pubnub.api.models.consumer.pubsub.PNPresenceEventResult;
import io.reactivex.Completable;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Created by Lukas Zaoralek on 14.11.17.
*/
public class PubnubStreamingService {
private static final Logger LOG = LoggerFactory.getLogger(PubnubStreamingService.class);

private final PubNub pubnub;
private PNStatusCategory pnStatusCategory;
private final Map<String, ObservableEmitter<JsonNode>> subscriptions = new ConcurrentHashMap<>();
private final ObjectMapper mapper;

public PubnubStreamingService(String publicKey) {
mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
PNConfiguration pnConfiguration = new PNConfiguration();
pnConfiguration.setSubscribeKey(publicKey);
pubnub = new PubNub(pnConfiguration);
pnStatusCategory = PNStatusCategory.PNDisconnectedCategory;
}

public Completable connect() {
return Completable.create(e -> {
pubnub.addListener(new SubscribeCallback() {
@Override
public void status(PubNub pubNub, PNStatus pnStatus) {
pnStatusCategory = pnStatus.getCategory();
LOG.debug("PubNub status: {} {}", pnStatusCategory.toString(), pnStatus.getStatusCode());
if (pnStatus.getCategory() == PNStatusCategory.PNConnectedCategory) {
// e.onComplete();
} else if (pnStatus.isError()) {
// e.onError(pnStatus.getErrorData().getThrowable());
}
}

@Override
public void message(PubNub pubNub, PNMessageResult pnMessageResult) {
String channelName = pnMessageResult.getChannel();
ObservableEmitter<JsonNode> subscription = subscriptions.get(channelName);
LOG.debug("PubNub Message: {}", pnMessageResult.toString());
if (subscription != null) {
JsonNode jsonMessage = null;
try {
jsonMessage = mapper.readTree(pnMessageResult.getMessage().toString());
} catch (IOException ex) {
ex.printStackTrace();
}
subscription.onNext(jsonMessage);
} else {
LOG.debug("No subscriber for channel {}.", channelName);
}
}

@Override
public void presence(PubNub pubNub, PNPresenceEventResult pnPresenceEventResult) {
LOG.debug("PubNub Message: {}", pnPresenceEventResult.toString());
}
});
e.onComplete();
});
}

public Observable<JsonNode> subscribeChannel(String channelName) {
LOG.info("Subscribing to channel {}.", channelName);
return Observable.<JsonNode>create(e -> {
if (!subscriptions.containsKey(channelName)) {
subscriptions.put(channelName, e);
pubnub.subscribe().channels(Collections.singletonList(channelName)).execute();
LOG.debug("Subscribe channel: {}", channelName);
}
}).doOnDispose(() -> {
LOG.debug("Unsubscribe channel: {}", channelName);
pubnub.unsubscribe().channels(Collections.singletonList(channelName)).execute();
}).share();
}

public Completable disconnect() {
return Completable.create(completable -> {
pubnub.disconnect();
completable.onComplete();
});
}
}
26 changes: 26 additions & 0 deletions xchange-bitflyer/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>xchange-stream-parent</artifactId>
<groupId>info.bitrich.xchange-stream</groupId>
<version>4.2.4-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>xchange-bitflyer</artifactId>

<dependencies>
<dependency>
<groupId>info.bitrich.xchange-stream</groupId>
<artifactId>xchange-stream-core</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>info.bitrich.xchange-stream</groupId>
<artifactId>service-pubnub</artifactId>
<version>${project.parent.version}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package info.bitrich.xchangestream.bitflyer;

import info.bitrich.xchangestream.core.StreamingExchange;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.service.pubnub.PubnubStreamingService;
import io.reactivex.Completable;
import org.knowm.xchange.BaseExchange;
import org.knowm.xchange.ExchangeSpecification;
import si.mazi.rescu.SynchronizedValueFactory;

/**
* Created by Lukas Zaoralek on 14.11.17.
*/
public class BitflyerStreamingExchange extends BaseExchange implements StreamingExchange {
private static final String API_KEY = "sub-c-52a9ab50-291b-11e5-baaa-0619f8945a4f";

private final PubnubStreamingService streamingService;
private BitflyerStreamingMarketDataService streamingMarketDataService;

public BitflyerStreamingExchange() {
this.streamingService = new PubnubStreamingService(API_KEY);
}

@Override
protected void initServices() {
streamingMarketDataService = new BitflyerStreamingMarketDataService(streamingService);
}

@Override
public Completable connect() {
return streamingService.connect();
}

@Override
public Completable disconnect() {
return streamingService.disconnect();
}

@Override
public SynchronizedValueFactory<Long> getNonceFactory() {
return null;
}

@Override
public ExchangeSpecification getDefaultExchangeSpecification() {
ExchangeSpecification spec = new ExchangeSpecification("Bitflyer");
spec.setShouldLoadRemoteMetaData(false);
return spec;
}

@Override
public StreamingMarketDataService getStreamingMarketDataService() {
return streamingMarketDataService;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package info.bitrich.xchangestream.bitflyer;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import info.bitrich.xchangestream.bitflyer.dto.*;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.service.pubnub.PubnubStreamingService;
import io.reactivex.Observable;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.Order;
import org.knowm.xchange.dto.marketdata.OrderBook;
import org.knowm.xchange.dto.marketdata.Ticker;
import org.knowm.xchange.dto.marketdata.Trade;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;

/**
* Created by Lukas Zaoralek on 14.11.17.
*/
public class BitflyerStreamingMarketDataService implements StreamingMarketDataService {
private static final Logger LOG = LoggerFactory.getLogger(BitflyerStreamingMarketDataService.class);

private final PubnubStreamingService streamingService;

private final Map<CurrencyPair, BitflyerOrderbook> orderbooks = new HashMap<>();
private final ObjectMapper mapper;

public BitflyerStreamingMarketDataService(PubnubStreamingService streamingService) {
this.streamingService = streamingService;
mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}

@Override
public Observable<OrderBook> getOrderBook(CurrencyPair currencyPair, Object... args) {
String channelOrderbookSnapshotName = "lightning_board_snapshot_" + currencyPair.base.toString() + "_" +
currencyPair.counter.toString();
String channelOrderbookUpdatesName = "lightning_board_" + currencyPair.base.toString() + "_" +
currencyPair.counter.toString();

Observable<BitflyerOrderbook> snapshotTransactions = streamingService.subscribeChannel
(channelOrderbookSnapshotName).map(s -> {
BitflyerPubNubOrderbookTransaction transaction = mapper.readValue(s.toString(), BitflyerPubNubOrderbookTransaction.class);
BitflyerOrderbook bitflyerOrderbook = transaction.toBitflyerOrderbook(currencyPair);
orderbooks.put(currencyPair, bitflyerOrderbook);
return bitflyerOrderbook;
});

Observable<BitflyerOrderbook> updateTransactions = streamingService.subscribeChannel(channelOrderbookUpdatesName)
.filter(s -> orderbooks.containsKey(currencyPair))
.map(s -> {
BitflyerOrderbook bitflyerOrderbook = orderbooks.get(currencyPair);
BitflyerPubNubOrderbookTransaction transaction = mapper.readValue(s.toString(), BitflyerPubNubOrderbookTransaction.class);
BitflyerLimitOrder[] asks = transaction.getAsks();
BitflyerLimitOrder[] bids = transaction.getBids();
bitflyerOrderbook.updateLevels(asks, Order.OrderType.ASK);
bitflyerOrderbook.updateLevels(bids, Order.OrderType.BID);
return bitflyerOrderbook;
});

return updateTransactions.mergeWith(snapshotTransactions).map(BitflyerOrderbook::toOrderBook);
}

@Override
public Observable<Ticker> getTicker(CurrencyPair currencyPair, Object... args) {
String channelName = "lightning_ticker_" + currencyPair.base.toString() + "_" + currencyPair.counter.toString();
Observable<BitflyerTicker> tickerTransactions = streamingService.subscribeChannel(channelName).map(s -> {
BitflyerPubNubTickerTransaction transaction = mapper.readValue(s.toString(), BitflyerPubNubTickerTransaction.class);
return transaction.toBitflyerTicker();
});

return tickerTransactions.map(BitflyerTicker::toTicker);
}

@Override
public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... args) {
String channelName = "lightning_executions_" + currencyPair.base.toString() + "_" + currencyPair.counter.toString();
Observable<BitflyerTrade> tradeTransactions = streamingService.subscribeChannel(channelName).flatMapIterable(s -> {
BitflyerPubNubTradesTransaction transaction = new BitflyerPubNubTradesTransaction(s);
return transaction.toBitflyerTrades();
});

return tradeTransactions.map(s -> s.toTrade(currencyPair));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package info.bitrich.xchangestream.bitflyer.dto;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.Order;
import org.knowm.xchange.dto.trade.LimitOrder;

import java.math.BigDecimal;

/**
* Created by Lukas Zaoralek on 14.11.17.
*/
public class BitflyerLimitOrder {
private final BigDecimal price;
private final BigDecimal size;

public BitflyerLimitOrder(@JsonProperty("price") BigDecimal price,
@JsonProperty("size") BigDecimal size) {
this.price = price;
this.size = size;
}

public BigDecimal getPrice() {
return price;
}

public BigDecimal getSize() {
return size;
}

public LimitOrder toLimitOrder(CurrencyPair pair, Order.OrderType side) {
return new LimitOrder(side, size, pair, "", null, price);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package info.bitrich.xchangestream.bitflyer.dto;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
* Created by Lukas Zaoralek on 15.11.17.
*/
public abstract class BitflyerMarketEvent {
protected final String timestamp;

BitflyerMarketEvent(String timestamp) {
this.timestamp = timestamp;
}

public String getTimestamp() {
return timestamp;
}

public Date getDate() {
SimpleDateFormat formatter;
formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
Date date = null;
try {
date = formatter.parse(timestamp.substring(0, 23));
} catch (ParseException e) {
e.printStackTrace();
}
return date;
}
}
Loading