Skip to content

Commit

Permalink
rework reconnect I2CP reconnect logic to prevent infinite loop when c…
Browse files Browse the repository at this point in the history
…onnection to I2P router is lost
  • Loading branch information
zlatinb committed Aug 8, 2024
1 parent 2c4d303 commit 3f72d20
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 47 deletions.
4 changes: 2 additions & 2 deletions core/src/main/groovy/com/muwire/core/Core.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ public class Core {
eventBus.register(UIBrowseEvent.class, browseManager)

log.info("initializing acceptor")
I2PAcceptor i2pAcceptor = new I2PAcceptor(i2pConnector::getSocketManager)
I2PAcceptor i2pAcceptor = new I2PAcceptor()
eventBus.register(RouterConnectedEvent.class, i2pAcceptor)
eventBus.register(RouterDisconnectedEvent.class, i2pAcceptor)
connectionAcceptor = new ConnectionAcceptor(eventBus, me, profileSupplier, connectionManager, props,
Expand Down Expand Up @@ -620,7 +620,7 @@ public class Core {
}
}

i2pConnector.connect()
i2pConnector.start()
contentManager.start()
hostCache.start({connectionManager.getConnections().collect{ it.endpoint.destination }} as Supplier)
connectionManager.start()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.muwire.core

import net.i2p.client.I2PSession
import net.i2p.client.streaming.I2PSocketManager

class RouterConnectedEvent extends Event {
I2PSession session
I2PSocketManager socketManager
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,17 @@ import net.i2p.I2PException
import net.i2p.client.streaming.I2PServerSocket
import net.i2p.client.streaming.I2PSocketManager

import java.util.function.Supplier
import java.util.logging.Level

@Log
class I2PAcceptor {

private final Supplier<I2PSocketManager> socketManager
private volatile I2PServerSocket serverSocket

I2PAcceptor() {}

I2PAcceptor(Supplier<I2PSocketManager> socketManager) {
this.socketManager = socketManager
}

synchronized void onRouterConnectedEvent(RouterConnectedEvent event) {
serverSocket = socketManager.get().getServerSocket()
serverSocket = event.socketManager.getServerSocket()
notifyAll()
}

Expand Down
79 changes: 51 additions & 28 deletions core/src/main/groovy/com/muwire/core/connection/I2PConnector.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.muwire.core.RouterDisconnectedEvent
import net.i2p.client.streaming.I2PSocketManager
import net.i2p.client.streaming.I2PSocketManagerFactory
import net.i2p.data.Destination
import net.i2p.I2PException

import java.util.concurrent.Semaphore

Expand All @@ -19,8 +20,11 @@ class I2PConnector {
private final int i2cpPort
private final Properties i2pProperties

volatile I2PSocketManager socketManager
private final I2PSocketManager socketManager
private volatile boolean connected

private Thread connectorThread

private final Map<Destination, Semaphore> limiter = Collections.synchronizedMap(new WeakHashMap<>())

I2PConnector() {}
Expand All @@ -31,44 +35,63 @@ class I2PConnector {
this.i2cpHost = i2cpHost
this.i2cpPort = i2cpPort
this.i2pProperties = i2pProperties

I2PSocketManager socketManager
keyDat.withInputStream {
socketManager = new I2PSocketManagerFactory().createDisconnectedManager(it, i2cpHost, i2cpPort, i2pProperties)
}
socketManager.getDefaultOptions().with {
setReadTimeout(60000)
setConnectTimeout(15000)
}
socketManager.addDisconnectListener({
connected = false
eventBus.publish(new RouterDisconnectedEvent())
} as I2PSocketManager.DisconnectListener)
this.socketManager = socketManager
}

synchronized void connect() {
if (socketManager != null)
return
while(true) {
I2PSocketManager socketManager
keyDat.withInputStream {
socketManager = new I2PSocketManagerFactory().createDisconnectedManager(it, i2cpHost, i2cpPort, i2pProperties)
}
socketManager.getDefaultOptions().with {
setReadTimeout(60000)
setConnectTimeout(15000)
}
socketManager.addDisconnectListener({
this.socketManager = null
eventBus.publish(new RouterDisconnectedEvent())
} as I2PSocketManager.DisconnectListener)
private void connectI2CP() {

def session = socketManager.getSession()
try {
session.connect()
this.socketManager = socketManager
eventBus.publish(new RouterConnectedEvent(session: session))
break
} catch (Exception e) {
Thread.sleep(1000)
def session = socketManager.getSession()
session.connect()

connected = true
eventBus.publish(new RouterConnectedEvent(session: session,
socketManager: socketManager))
}

void start() {
connectI2CP()
Runnable r = {
while(true) {
try {
if (!connected)
connectI2CP()
} catch (Exception ignored) {}
finally {
try {
Thread.sleep(1000)
} catch (InterruptedException ie) {
break
}
}
}
}
connectorThread = new Thread(r)
connectorThread.setDaemon(true)
connectorThread.setName("I2CP Connector")
connectorThread.start()
}

void shutdown() {
socketManager?.destroySocketManager()
socketManager = null
connectorThread?.interrupt()
socketManager.destroySocketManager()
}

Endpoint connect(Destination dest) {
connect()
if (!connected)
throw new I2PException("No I2CP connection")
Semaphore limit = limiter.computeIfAbsent(dest, {new Semaphore(PERMITS)})
limit.acquire()
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class CacheClient {
final MuWireSettings settings
final Timer timer
private volatile I2PSession session
private volatile boolean connected
private final AtomicBoolean stopped = new AtomicBoolean();

public CacheClient(EventBus eventBus, HostCache cache,
Expand All @@ -56,16 +57,19 @@ class CacheClient {
}

void onRouterConnectedEvent(RouterConnectedEvent event) {
session = event.session
session.addMuxedSessionListener(new Listener(), I2PSession.PROTO_DATAGRAM, 0)
connected = true
if (session == null) {
session = event.session
session.addMuxedSessionListener(new Listener(), I2PSession.PROTO_DATAGRAM, 0)
}
}

void onRouterDisconnectedEvent(RouterDisconnectedEvent event) {
session = null
connected = false
}

private void queryIfNeeded() {
if (stopped.get())
if (stopped.get() || !connected)
return
I2PSession session = this.session
if (session == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class TrackerResponder {
private volatile boolean shutdown

private volatile I2PSession i2pSession
private volatile boolean connected

TrackerResponder(MuWireSettings muSettings,
FileManager fileManager, DownloadManager downloadManager,
Expand All @@ -70,12 +71,15 @@ class TrackerResponder {
}

void onRouterConnectedEvent(RouterConnectedEvent event) {
i2pSession = event.session
i2pSession.addMuxedSessionListener(new Listener(), I2PSession.PROTO_DATAGRAM, Constants.TRACKER_PORT)
connected = true
if (i2pSession == null) {
i2pSession = event.session
i2pSession.addMuxedSessionListener(new Listener(), I2PSession.PROTO_DATAGRAM, Constants.TRACKER_PORT)
}
}

void onRouterDisconnectedEvent(RouterDisconnectedEvent event) {
i2pSession = null
connected = false
}

private void expireUUIDs() {
Expand All @@ -91,6 +95,8 @@ class TrackerResponder {
}

private void respond(host, json) {
if (!connected)
return
I2PSession i2pSession = this.i2pSession
if (i2pSession == null)
return
Expand Down
12 changes: 9 additions & 3 deletions core/src/main/groovy/com/muwire/core/update/UpdateClient.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class UpdateClient {
private long lastUpdateCheckTime

private volatile I2PSession session
private volatile boolean connected

private volatile InfoHash updateInfoHash
private volatile String version, signer
Expand Down Expand Up @@ -77,12 +78,15 @@ class UpdateClient {
}

void onRouterConnectedEvent(RouterConnectedEvent event) {
this.session = event.session
session.addMuxedSessionListener(new Listener(), I2PSession.PROTO_DATAGRAM, Constants.UPDATE_PORT)
connected = true
if (session == null) {
this.session = event.session
session.addMuxedSessionListener(new Listener(), I2PSession.PROTO_DATAGRAM, Constants.UPDATE_PORT)
}
}

void onRouterDisconnectedEvent(RouterDisconnectedEvent event) {
session = null
connected = false
}

void onUIResultBatchEvent(UIResultBatchEvent results) {
Expand All @@ -105,6 +109,8 @@ class UpdateClient {
}

private void checkUpdate() {
if (!connected)
return
I2PSession session = this.session
if (session == null)
return
Expand Down

0 comments on commit 3f72d20

Please sign in to comment.