Skip to content

Commit

Permalink
More API
Browse files Browse the repository at this point in the history
  • Loading branch information
palango committed Jun 8, 2020
1 parent 003eba6 commit e672c59
Show file tree
Hide file tree
Showing 18 changed files with 294 additions and 114 deletions.
31 changes: 13 additions & 18 deletions src/monitoring_service/api.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
from datetime import datetime
from typing import Dict, List, Optional, Tuple, cast
from typing import List, Optional, Tuple, cast

import pkg_resources
import structlog
from flask import Flask, Response
from eth_utils import to_checksum_address
from flask import Flask
from flask_restful import Resource
from gevent.pywsgi import WSGIServer

from monitoring_service.constants import API_PATH, DEFAULT_INFO_MESSAGE
from monitoring_service.service import MonitoringService
from raiden.utils.formatting import to_checksum_address
from raiden.utils.typing import TokenAmount
from raiden_libs.api import ApiWithErrorHandler

log = structlog.get_logger(__name__)
Expand All @@ -19,6 +18,9 @@
class MSResource(Resource):
def __init__(self, monitoring_service: MonitoringService, api: "MsApi"):
self.monitoring_service = monitoring_service
self.service_token_address = (
self.monitoring_service.context.user_deposit_contract.functions.token().call()
)
self.api = api


Expand All @@ -28,20 +30,18 @@ class InfoResource(MSResource):

def get(self) -> Tuple[dict, int]:
info = {
"price_info": self.api.service_fee,
"price_info": self.api.monitoring_service.context.min_reward,
"network_info": {
"chain_id": self.pathfinding_service.chain_id,
"chain_id": self.monitoring_service.chain_id,
"token_network_registry_address": to_checksum_address(
self.pathfinding_service.registry_address
self.monitoring_service.context.ms_state.blockchain_state.token_network_registry_address # noqa
),
"user_deposit_address": to_checksum_address(
self.pathfinding_service.user_deposit_contract.address
),
"service_token_address": to_checksum_address(
self.pathfinding_service.service_token_address
self.monitoring_service.context.user_deposit_contract.address
),
"service_token_address": to_checksum_address(self.service_token_address),
"confirmed_block": {
"number": self.pathfinding_service.blockchain_state.latest_committed_block
"number": self.monitoring_service.context.ms_state.blockchain_state.latest_committed_block # noqa
},
},
"version": self.version,
Expand All @@ -59,14 +59,12 @@ def __init__(
monitoring_service: MonitoringService,
operator: str,
info_message: str = DEFAULT_INFO_MESSAGE,
service_fee: TokenAmount = TokenAmount(0),
) -> None:
self.flask_app = Flask(__name__)
self.api = ApiWithErrorHandler(self.flask_app)
self.rest_server: Optional[WSGIServer] = None

self.monitoring_service = monitoring_service
self.service_fee = service_fee
self.operator = operator
self.info_message = info_message

Expand All @@ -78,10 +76,7 @@ def __init__(
self.api.add_resource(
resource,
API_PATH + endpoint_url,
resource_class_kwargs={
"monitoring_service": monitoring_service,
"service_api": self,
},
resource_class_kwargs={"monitoring_service": monitoring_service, "api": self},
endpoint=endpoint,
)

Expand Down
87 changes: 68 additions & 19 deletions src/monitoring_service/cli.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
from gevent import monkey, config # isort:skip # noqa

# there were some issues with the 'thread' resolver, remove it from the options
config.resolver = ["dnspython", "ares", "block"] # noqa
monkey.patch_all(subprocess=False, thread=False) # isort:skip # noqa

from typing import Dict

import click
Expand All @@ -6,7 +12,8 @@
from web3 import Web3
from web3.contract import Contract

from monitoring_service.constants import DEFAULT_MIN_REWARD, MS_DISCLAIMER
from monitoring_service.api import MsApi
from monitoring_service.constants import DEFAULT_INFO_MESSAGE, DEFAULT_MIN_REWARD, MS_DISCLAIMER
from monitoring_service.service import MonitoringService
from raiden.settings import DEFAULT_NUMBER_OF_BLOCK_CONFIRMATIONS
from raiden.utils.typing import BlockNumber, BlockTimeout
Expand All @@ -18,7 +25,12 @@
)
from raiden_libs.blockchain import get_web3_provider_info
from raiden_libs.cli import blockchain_options, common_options, setup_sentry
from raiden_libs.constants import CONFIRMATION_OF_UNDERSTANDING, DEFAULT_POLL_INTERVALL
from raiden_libs.constants import (
CONFIRMATION_OF_UNDERSTANDING,
DEFAULT_API_HOST,
DEFAULT_API_PORT_MS,
DEFAULT_POLL_INTERVALL,
)

log = structlog.get_logger(__name__)

Expand All @@ -32,6 +44,15 @@
]
)
@click.command()
@click.option(
"--host", default=DEFAULT_API_HOST, type=str, help="The host to use for serving the REST API"
)
@click.option(
"--port",
default=DEFAULT_API_PORT_MS,
type=int,
help="The port to use for serving the REST API",
)
@click.option(
"--min-reward",
default=DEFAULT_MIN_REWARD,
Expand All @@ -44,6 +65,13 @@
type=click.IntRange(min=0),
help="Number of block confirmations to wait for",
)
@click.option("--operator", default="John Doe", type=str, help="Name of the service operator")
@click.option(
"--info-message",
default=DEFAULT_INFO_MESSAGE,
type=str,
help="Place for a personal message to the customers",
)
@click.option(
"--debug-shell",
default=False,
Expand All @@ -58,14 +86,18 @@
is_flag=True,
)
@common_options("raiden-monitoring-service")
def main( # pylint: disable=too-many-arguments
def main( # pylint: disable=too-many-arguments,too-many-locals
private_key: str,
state_db: str,
web3: Web3,
contracts: Dict[str, Contract],
start_block: BlockNumber,
confirmations: BlockTimeout,
host: str,
port: int,
min_reward: int,
confirmations: BlockTimeout,
operator: str,
info_message: str,
debug_shell: bool,
accept_disclaimer: bool,
) -> int:
Expand All @@ -80,24 +112,41 @@ def main( # pylint: disable=too-many-arguments
}
log.info("Contract information", addresses=hex_addresses, start_block=start_block)

ms = MonitoringService(
web3=web3,
private_key=private_key,
contracts=contracts,
sync_start_block=start_block,
required_confirmations=confirmations,
poll_interval=DEFAULT_POLL_INTERVALL,
db_filename=state_db,
min_reward=min_reward,
)
service = None
api = None
try:
service = MonitoringService(
web3=web3,
private_key=private_key,
contracts=contracts,
sync_start_block=start_block,
required_confirmations=confirmations,
poll_interval=DEFAULT_POLL_INTERVALL,
db_filename=state_db,
min_reward=min_reward,
)

if debug_shell:
import IPython

IPython.embed()
return 0

if debug_shell:
import IPython
service.start()

IPython.embed()
return 0
log.debug("Starting API")
api = MsApi(monitoring_service=service, operator=operator, info_message=info_message)
api.run(host=host, port=port)

ms.start()
service.get()
except (KeyboardInterrupt, SystemExit):
print("Exiting...")
finally:
log.info("Stopping Monitoring Service...")
if api:
api.stop()
if service:
service.stop()

return 0

Expand Down
4 changes: 2 additions & 2 deletions src/monitoring_service/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def channel_closed_event_handler(event: Event, context: Context) -> None:
# scheduled (e.g. after a restart) the DB takes care that it is only
# stored once.
context.database.upsert_scheduled_event(
ScheduledEvent(trigger_block_number=trigger_block, event=Event)
ScheduledEvent(trigger_block_number=trigger_block, event=triggered_event)
)
else:
log.warning(
Expand Down Expand Up @@ -375,7 +375,7 @@ def monitor_new_balance_proof_event_handler(event: Event, context: Context) -> N
# If the event is already scheduled (e.g. after a restart) the DB takes care that
# it is only stored once
context.database.upsert_scheduled_event(
ScheduledEvent(trigger_block_number=trigger_block, event=Event)
ScheduledEvent(trigger_block_number=trigger_block, event=triggered_event)
)


Expand Down
28 changes: 16 additions & 12 deletions src/monitoring_service/service.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import sys
import time
from datetime import datetime
from typing import Callable, Dict
from typing import Dict

import gevent
import sentry_sdk
import structlog
from eth_typing import Hash32
Expand Down Expand Up @@ -78,7 +78,8 @@ def handle_event(event: Event, context: Context) -> None:
sentry_sdk.capture_exception(ex)


class MonitoringService: # pylint: disable=too-few-public-methods,too-many-instance-attributes
class MonitoringService(gevent.Greenlet):
# pylint: disable=too-few-public-methods,too-many-instance-attributes
def __init__( # pylint: disable=too-many-arguments
self,
web3: Web3,
Expand All @@ -90,23 +91,26 @@ def __init__( # pylint: disable=too-many-arguments
poll_interval: float,
min_reward: int = 0,
):
super().__init__()

self.web3 = web3
self.chain_id = ChainID(web3.eth.chainId)
self.private_key = private_key
self.address = private_key_to_address(private_key)
self.poll_interval = poll_interval
self.service_registry = contracts[CONTRACT_SERVICE_REGISTRY]
self.token_network_registry = contracts[CONTRACT_TOKEN_NETWORK_REGISTRY]
self._is_running = gevent.event.Event()

web3.middleware_onion.add(construct_sign_and_send_raw_middleware(private_key))

monitoring_contract = contracts[CONTRACT_MONITORING_SERVICE]
user_deposit_contract = contracts[CONTRACT_USER_DEPOSIT]
token_network_registry_contract = contracts[CONTRACT_TOKEN_NETWORK_REGISTRY]

self.database = Database(
filename=db_filename,
chain_id=self.chain_id,
registry_address=to_canonical_address(token_network_registry_contract.address),
registry_address=to_canonical_address(self.token_network_registry.address),
receiver=self.address,
msc_address=MonitoringServiceAddress(
to_canonical_address(monitoring_contract.address)
Expand All @@ -125,21 +129,18 @@ def __init__( # pylint: disable=too-many-arguments
required_confirmations=required_confirmations,
)

def start(
self, wait_function: Callable = time.sleep, check_account_gas_reserve: bool = True
) -> None:
def _run(self) -> None: # pylint: disable=method-hidden
if not self.service_registry.functions.hasValidRegistration(self.address).call():
log.error("No valid registration in ServiceRegistry", address=self.address)
sys.exit(1)

last_gas_check_block = 0
while True:
while not self._is_running.is_set():
last_confirmed_block = self.context.latest_confirmed_block

# check gas reserve
do_gas_reserve_check = (
check_account_gas_reserve
and last_confirmed_block >= last_gas_check_block + DEFAULT_GAS_CHECK_BLOCKS
last_confirmed_block >= last_gas_check_block + DEFAULT_GAS_CHECK_BLOCKS
)
if do_gas_reserve_check:
check_gas_reserve(self.web3, self.private_key)
Expand All @@ -150,7 +151,10 @@ def start(
self._check_pending_transactions()
self._purge_old_monitor_requests()

wait_function(self.poll_interval)
gevent.sleep(self.poll_interval)

def stop(self) -> None:
self._is_running.set()

def _process_new_blocks(self, latest_confirmed_block: BlockNumber) -> None:
token_network_addresses = self.context.database.get_token_network_addresses()
Expand Down
Loading

0 comments on commit e672c59

Please sign in to comment.