Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse IPC Socket #356

Merged
merged 1 commit into from
Oct 19, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 42 additions & 31 deletions web3/providers/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import sys
import os
import contextlib

try:
from json import JSONDecodeError
Expand All @@ -18,21 +17,38 @@
from .base import JSONBaseProvider


@contextlib.contextmanager
def get_ipc_socket(ipc_path, timeout=0.1):
if sys.platform == 'win32':
# On Windows named pipe is used. Simulate socket with it.
from web3.utils.windows import NamedPipe

pipe = NamedPipe(ipc_path)
with contextlib.closing(pipe):
yield pipe
return NamedPipe(ipc_path)
else:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(ipc_path)
sock.settimeout(timeout)
with contextlib.closing(sock):
yield sock
return sock


class PersistantSocket(object):
sock = None

def __init__(self, ipc_path):
self.ipc_path = ipc_path

def __enter__(self):
if not self.sock:
self.sock = get_ipc_socket(self.ipc_path)
return self.sock

def __exit__(self, exc_type, exc_value, traceback):
# only close the socket if there was an error
if exc_value is not None:
try:
self.sock.close()
except:
pass
self.sock = None


def get_default_ipc_path(testnet=False):
Expand Down Expand Up @@ -66,43 +82,38 @@ def get_default_ipc_path(testnet=False):


class IPCProvider(JSONBaseProvider):
_socket = None

def __init__(self, ipc_path=None, testnet=False, *args, **kwargs):
if ipc_path is None:
self.ipc_path = get_default_ipc_path(testnet)
else:
self.ipc_path = ipc_path

self._lock = threading.Lock()
self._socket = PersistantSocket(self.ipc_path)
super(IPCProvider, self).__init__(*args, **kwargs)

def make_request(self, method, params):
request = self.encode_rpc_request(method, params)

self._lock.acquire()

try:
with get_ipc_socket(self.ipc_path) as sock:
sock.sendall(request)
# TODO: use a BytesIO object here
raw_response = b""

with Timeout(10) as timeout:
while True:
with self._lock, self._socket as sock:
sock.sendall(request)
raw_response = b""
with Timeout(10) as timeout:
while True:
try:
raw_response += sock.recv(4096)
except socket.timeout:
timeout.sleep(0)
continue
if raw_response == b"":
timeout.sleep(0)
else:
try:
raw_response += sock.recv(4096)
except socket.timeout:
response = self.decode_rpc_response(raw_response)
except JSONDecodeError:
timeout.sleep(0)
continue

if raw_response == b"":
timeout.sleep(0)
else:
try:
response = self.decode_rpc_response(raw_response)
except JSONDecodeError:
timeout.sleep(0)
continue
else:
return response
finally:
self._lock.release()
return response