From 4eb7e871ecb7e22d49ae390d8032ea5cd9526778 Mon Sep 17 00:00:00 2001 From: nodarret Date: Sat, 27 Jan 2018 20:59:34 +0100 Subject: [PATCH] Reconnect with channelName instead channelId (#85) #82 Bitfinex reconnect loop --- .../service/netty/NettyStreamingService.java | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/service-netty/src/main/java/info/bitrich/xchangestream/service/netty/NettyStreamingService.java b/service-netty/src/main/java/info/bitrich/xchangestream/service/netty/NettyStreamingService.java index 3f374ed8d..6e784e700 100644 --- a/service-netty/src/main/java/info/bitrich/xchangestream/service/netty/NettyStreamingService.java +++ b/service-netty/src/main/java/info/bitrich/xchangestream/service/netty/NettyStreamingService.java @@ -32,8 +32,15 @@ public abstract class NettyStreamingService { private static final Logger LOG = LoggerFactory.getLogger(NettyStreamingService.class); private class Subscription { - ObservableEmitter emitter; - Object[] args; + final ObservableEmitter emitter; + final String channelName; + final Object[] args; + + public Subscription(ObservableEmitter emitter, String channelName, Object[] args) { + this.emitter = emitter; + this.channelName = channelName; + this.args = args; + } } private final int maxFramePayloadLength; @@ -188,9 +195,7 @@ public Observable subscribeChannel(String channelName, Object... args) { } if (!channels.containsKey(channelId)) { - Subscription newSubscription = new Subscription(); - newSubscription.args = args; - newSubscription.emitter = e; + Subscription newSubscription = new Subscription(e, channelName, args); channels.put(channelId, newSubscription); try { sendMessage(getSubscribeMessage(channelName, args)); @@ -207,11 +212,12 @@ public Observable subscribeChannel(String channelName, Object... args) { } public void resubscribeChannels() { - for (String channelName : channels.keySet()) { + for (String channelId : channels.keySet()) { try { - sendMessage(getSubscribeMessage(channelName, channels.get(channelName).args)); + Subscription subscription = channels.get(channelId); + sendMessage(getSubscribeMessage(subscription.channelName, subscription.args)); } catch (IOException e) { - LOG.error("Failed to reconnect channel: {}", channelName); + LOG.error("Failed to reconnect channel: {}", channelId); } } }