Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Binance futures user data stream #4754

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.io.IOException;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;

@Path("")
@Produces(MediaType.APPLICATION_JSON)
Expand Down Expand Up @@ -208,4 +209,32 @@ List<BinanceCancelledOrder> cancelAllFutureOpenOrders(
@QueryParam(SIGNATURE) ParamsDigest signature)
throws IOException, BinanceException;

/**
* Returns a listen key for websocket login.
*
* @param apiKey the api key
* @return
* @throws BinanceException
* @throws IOException
*/
@POST
@Path("fapi/v1/listenKey")
BinanceListenKey startUserDataStream(@HeaderParam(X_MBX_APIKEY) String apiKey)
throws IOException, BinanceException;

/**
* Keeps the authenticated websocket session alive.
*
* @param apiKey the api key
* @param listenKey the api secret
* @return
* @throws BinanceException
* @throws IOException
*/
@PUT
@Path("fapi/v1/listenKey?listenKey={listenKey}")
Map<?, ?> keepAliveUserDataStream(
@HeaderParam(X_MBX_APIKEY) String apiKey, @PathParam("listenKey") String listenKey)
throws IOException, BinanceException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ public class BinanceStreamingAccountService implements StreamingAccountService {
private final Subject<OutboundAccountPositionBinanceWebsocketTransaction> accountInfoPublisher =
accountInfoLast.toSerialized();

private volatile Disposable accountInfo;
private volatile BinanceUserDataStreamingService binanceUserDataStreamingService;
protected volatile Disposable accountInfo;
protected volatile BinanceUserDataStreamingService binanceUserDataStreamingService;

private final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();
protected final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();

public BinanceStreamingAccountService(
BinanceUserDataStreamingService binanceUserDataStreamingService) {
Expand Down Expand Up @@ -80,7 +80,7 @@ public void openSubscriptions() {
* URLs and therefore must act in a publisher fashion so that subscribers get an uninterrupted
* stream.
*/
void setUserDataStreamingService(
public void setUserDataStreamingService(
BinanceUserDataStreamingService binanceUserDataStreamingService) {
if (accountInfo != null && !accountInfo.isDisposed()) accountInfo.dispose();
this.binanceUserDataStreamingService = binanceUserDataStreamingService;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package info.bitrich.xchangestream.binance;

import info.bitrich.xchangestream.binance.BinanceUserDataChannel.NoActiveChannelException;
import info.bitrich.xchangestream.binance.exceptions.NoActiveChannelException;
import info.bitrich.xchangestream.core.ProductSubscription;
import info.bitrich.xchangestream.core.StreamingExchange;
import info.bitrich.xchangestream.service.netty.ConnectionStateModel.State;
Expand Down Expand Up @@ -33,18 +33,18 @@ public class BinanceStreamingExchange extends BinanceExchange implements Streami
public static final String USE_HIGHER_UPDATE_FREQUENCY = "Binance_Orderbook_Use_Higher_Frequency";
public static final String USE_REALTIME_BOOK_TICKER = "Binance_Ticker_Use_Realtime";
public static final String FETCH_ORDER_BOOK_LIMIT = "Binance_Fetch_Order_Book_Limit";
private BinanceStreamingService streamingService;
private BinanceUserDataStreamingService userDataStreamingService;
public BinanceStreamingService streamingService;
protected BinanceUserDataStreamingService userDataStreamingService;

private BinanceStreamingMarketDataService streamingMarketDataService;
protected BinanceStreamingMarketDataService streamingMarketDataService;
private BinanceStreamingAccountService streamingAccountService;
private BinanceStreamingTradeService streamingTradeService;

private BinanceUserDataChannel userDataChannel;
private Runnable onApiCall;
private String orderBookUpdateFrequencyParameter = "";
private int oderBookFetchLimitParameter = 1000;
private boolean realtimeOrderBookTicker;
public BinanceUserDataChannel userDataChannel;
public Runnable onApiCall;
public String orderBookUpdateFrequencyParameter = "";
public int oderBookFetchLimitParameter = 1000;
public boolean realtimeOrderBookTicker;

@Override
protected void initServices() {
Expand Down Expand Up @@ -120,7 +120,7 @@ private Completable internalConnect(
BinanceAuthenticated.class, getExchangeSpecification())
.build();
userDataChannel =
new BinanceUserDataChannel(binance, exchangeSpecification.getApiKey(), onApiCall);
new BinanceUserDataChannelImpl(binance, exchangeSpecification.getApiKey(), onApiCall);
try {
completables.add(createAndConnectUserDataService(userDataChannel.getListenKey()));
} catch (NoActiveChannelException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ public class BinanceStreamingTradeService implements StreamingTradeService {
private final Subject<ExecutionReportBinanceUserTransaction> executionReportsPublisher =
PublishSubject.<ExecutionReportBinanceUserTransaction>create().toSerialized();

private volatile Disposable executionReports;
private volatile BinanceUserDataStreamingService binanceUserDataStreamingService;
protected volatile Disposable executionReports;
protected volatile BinanceUserDataStreamingService binanceUserDataStreamingService;

private final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();
protected final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();

public BinanceStreamingTradeService(
BinanceUserDataStreamingService binanceUserDataStreamingService) {
Expand Down Expand Up @@ -90,7 +90,7 @@ public void openSubscriptions() {
* URLs and therefore must act in a publisher fashion so that subscribers get an uninterrupted
* stream.
*/
void setUserDataStreamingService(
public void setUserDataStreamingService(
BinanceUserDataStreamingService binanceUserDataStreamingService) {
if (executionReports != null && !executionReports.isDisposed()) executionReports.dispose();
this.binanceUserDataStreamingService = binanceUserDataStreamingService;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,7 @@
package info.bitrich.xchangestream.binance;

import static java.util.concurrent.TimeUnit.SECONDS;

import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import info.bitrich.xchangestream.binance.exceptions.NoActiveChannelException;
import java.util.function.Consumer;
import org.knowm.xchange.binance.BinanceAuthenticated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Binance user data streams must be established by first requesting a unique "listen key" via
Expand All @@ -22,107 +14,20 @@
*
* @author Graham Crockford
*/
class BinanceUserDataChannel implements AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(BinanceUserDataChannel.class);

private final BinanceAuthenticated binance;
private final String apiKey;
private final Runnable onApiCall;
private final Disposable keepAlive;

private String listenKey;
private Consumer<String> onChangeListenKey;

/**
* Creates the channel, establishing a listen key (immediately available from {@link
* #getListenKey()}) and starting timers to ensure the channel is kept alive.
*
* @param binance Access to binance services.
* @param apiKey The API key.
* @param onApiCall A callback to perform prior to any service calls.
*/
BinanceUserDataChannel(BinanceAuthenticated binance, String apiKey, Runnable onApiCall) {
this.binance = binance;
this.apiKey = apiKey;
this.onApiCall = onApiCall;
openChannel();
// Send a keepalive every 30 minutes as recommended by Binance
this.keepAlive = Observable.interval(30, TimeUnit.MINUTES).subscribe(x -> keepAlive());
}
public interface BinanceUserDataChannel extends AutoCloseable {

/**
* Set this callback to get notified if the current listen key is revoked.
*
* @param onChangeListenKey The callback.
*/
void onChangeListenKey(Consumer<String> onChangeListenKey) {
this.onChangeListenKey = onChangeListenKey;
}

private void keepAlive() {
if (listenKey == null) return;
try {
LOG.debug("Keeping user data channel alive");
onApiCall.run();
binance.keepAliveUserDataStream(apiKey, listenKey);
LOG.debug("User data channel keepalive sent successfully");
} catch (Exception e) {
LOG.error("User data channel keepalive failed.", e);
this.listenKey = null;
reconnect();
}
}

private void reconnect() {
try {
openChannel();
if (onChangeListenKey != null) {
onChangeListenKey.accept(this.listenKey);
}
} catch (Exception e) {
LOG.error("Failed to reconnect. Will retry in 15 seconds.", e);
Observable.timer(15, SECONDS).subscribe(x -> reconnect());
}
}

private void openChannel() {
try {
LOG.debug("Opening new user data channel");
onApiCall.run();
this.listenKey = binance.startUserDataStream(apiKey).getListenKey();
LOG.debug("Opened new user data channel");
} catch (IOException e) {
throw new RuntimeException(e);
}
}
void onChangeListenKey(Consumer<String> onChangeListenKey);

/**
* @return The current listen key.
* @throws NoActiveChannelException If no listen key is currently available.
*/
String getListenKey() throws NoActiveChannelException {
if (listenKey == null) throw new NoActiveChannelException();
return listenKey;
}

@Override
public void close() {
keepAlive.dispose();
}

/**
* Thrown on calls to {@link BinanceUserDataChannel#getListenKey()} if no channel is currently
* open.
*
* @author Graham Crockford
*/
static final class NoActiveChannelException extends Exception {

private static final long serialVersionUID = -8161003286845820286L;
String getListenKey() throws NoActiveChannelException;

NoActiveChannelException() {
super();
}
}
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package info.bitrich.xchangestream.binance;

import static java.util.concurrent.TimeUnit.SECONDS;

import info.bitrich.xchangestream.binance.exceptions.NoActiveChannelException;
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.knowm.xchange.binance.BinanceAuthenticated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BinanceUserDataChannelImpl implements BinanceUserDataChannel {

private static final Logger LOG = LoggerFactory.getLogger(BinanceUserDataChannelImpl.class);

private final BinanceAuthenticated binance;
private final String apiKey;
private final Runnable onApiCall;
private final Disposable keepAlive;

private String listenKey;
private Consumer<String> onChangeListenKey;

/**
* Creates the channel, establishing a listen key (immediately available from {@link
* #getListenKey()}) and starting timers to ensure the channel is kept alive.
*
* @param binance Access to binance services.
* @param apiKey The API key.
* @param onApiCall A callback to perform prior to any service calls.
*/
protected BinanceUserDataChannelImpl(BinanceAuthenticated binance, String apiKey, Runnable onApiCall) {
this.binance = binance;
this.apiKey = apiKey;
this.onApiCall = onApiCall;
openChannel();
// Send a keepalive every 30 minutes as recommended by Binance
this.keepAlive = Observable.interval(30, TimeUnit.MINUTES).subscribe(x -> keepAlive());
}

@Override
public void onChangeListenKey(Consumer<String> onChangeListenKey) {
this.onChangeListenKey = onChangeListenKey;
}

private void keepAlive() {
if (listenKey == null) return;
try {
LOG.debug("Keeping user data channel alive");
onApiCall.run();
binance.keepAliveUserDataStream(apiKey, listenKey);
LOG.debug("User data channel keepalive sent successfully");
} catch (Exception e) {
LOG.error("User data channel keepalive failed.", e);
this.listenKey = null;
reconnect();
}
}

private void reconnect() {
try {
openChannel();
if (onChangeListenKey != null) {
onChangeListenKey.accept(this.listenKey);
}
} catch (Exception e) {
LOG.error("Failed to reconnect. Will retry in 15 seconds.", e);
Observable.timer(15, SECONDS).subscribe(x -> reconnect());
}
}

private void openChannel() {
try {
LOG.debug("Opening new user data channel");
onApiCall.run();
this.listenKey = binance.startUserDataStream(apiKey).getListenKey();
LOG.debug("Opened new user data channel");
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public String getListenKey() throws NoActiveChannelException {
if (listenKey == null) throw new NoActiveChannelException();
return listenKey;
}

@Override
public void close() {
keepAlive.dispose();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ public enum BinanceWebSocketTypes {
AGG_TRADE("aggTrade"),
TRADE("trade"),
OUTBOUND_ACCOUNT_POSITION("outboundAccountPosition"),
EXECUTION_REPORT("executionReport");
EXECUTION_REPORT("executionReport"),
ORDER_TRADE_UPDATE("ORDER_TRADE_UPDATE"),
ACCOUNT_UPDATE("ACCOUNT_UPDATE");

/**
* Get a type from the `type` string of a `ProductBinanceWebSocketTransaction`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ public enum ExecutionType {
REPLACED,
REJECTED,
TRADE,
EXPIRED
EXPIRED,
AMENDMENT,
CALCULATED
}

private final String clientOrderId;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package info.bitrich.xchangestream.binance.exceptions;

import info.bitrich.xchangestream.binance.BinanceUserDataChannelImpl;

/**
* Thrown on calls to {@link BinanceUserDataChannelImpl#getListenKey()} if no channel is currently
* open.
*
* @author Graham Crockford
*/
public class NoActiveChannelException extends Exception {

private static final long serialVersionUID = -8161003286845820286L;

public NoActiveChannelException() {
super();
}
}
Loading
Loading