Skip to content
This repository has been archived by the owner on Jun 8, 2020. It is now read-only.

Commit

Permalink
Reconnect with channelName instead channelId (#85)
Browse files Browse the repository at this point in the history
#82 Bitfinex reconnect loop
  • Loading branch information
nodarret authored and dozd committed Jan 27, 2018
1 parent a2b8478 commit 4eb7e87
Showing 1 changed file with 14 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,15 @@ public abstract class NettyStreamingService<T> {
private static final Logger LOG = LoggerFactory.getLogger(NettyStreamingService.class);

private class Subscription {
ObservableEmitter<T> emitter;
Object[] args;
final ObservableEmitter<T> emitter;
final String channelName;
final Object[] args;

public Subscription(ObservableEmitter<T> emitter, String channelName, Object[] args) {
this.emitter = emitter;
this.channelName = channelName;
this.args = args;
}
}

private final int maxFramePayloadLength;
Expand Down Expand Up @@ -188,9 +195,7 @@ public Observable<T> subscribeChannel(String channelName, Object... args) {
}

if (!channels.containsKey(channelId)) {
Subscription newSubscription = new Subscription();
newSubscription.args = args;
newSubscription.emitter = e;
Subscription newSubscription = new Subscription(e, channelName, args);
channels.put(channelId, newSubscription);
try {
sendMessage(getSubscribeMessage(channelName, args));
Expand All @@ -207,11 +212,12 @@ public Observable<T> subscribeChannel(String channelName, Object... args) {
}

public void resubscribeChannels() {
for (String channelName : channels.keySet()) {
for (String channelId : channels.keySet()) {
try {
sendMessage(getSubscribeMessage(channelName, channels.get(channelName).args));
Subscription subscription = channels.get(channelId);
sendMessage(getSubscribeMessage(subscription.channelName, subscription.args));
} catch (IOException e) {
LOG.error("Failed to reconnect channel: {}", channelName);
LOG.error("Failed to reconnect channel: {}", channelId);
}
}
}
Expand Down

0 comments on commit 4eb7e87

Please sign in to comment.