Skip to content

Commit

Permalink
Merge pull request #4 from a-hurst/check_port
Browse files Browse the repository at this point in the history
Add validation of specified serial port
  • Loading branch information
a-hurst authored Feb 8, 2024
2 parents b31f71f + 4f3015f commit dfacddd
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 8 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/run_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ name: Tests

on:
push:
branches: [ testing ]
branches: [ main ]
pull_request:
branches: [ testing ]
branches: [ main ]
create:
branches: [ testing ]
branches: [ main ]
tags: ['**']


Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ magneto 0.0.3
communication error occurs with the stimulator.
* Fixed rare startup crash when Magstim sends a message consisting only of null
bytes.
* Added a check to ensure the requested serial port exists on the system when
initializing a Magstim class, raising an informative exception if it doesn't.


magneto 0.0.2
Expand Down
22 changes: 20 additions & 2 deletions magneto/magstim.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
from queue import Queue

from .constants import *
from .utils import build_command, get_mode_byte, int_to_ascii, _validate_response
from .utils import (
build_command, get_mode_byte, int_to_ascii, _validate_response,
_get_available_ports,
)
from .communication import Response, MagstimStatus, _serial_connect, comm_loop


Expand All @@ -29,7 +32,7 @@ class Magstim(object):
"""
def __init__(self, port, bistim_sd=True):
self._port = port
self._port = self._validate_port(port)
self._status = None
self._to_stim = Queue()
self._from_stim = Queue()
Expand All @@ -38,6 +41,21 @@ def __init__(self, port, bistim_sd=True):
self._com_thread = None
self._simultaneous = bistim_sd

def _validate_port(self, port):
# Makes sure the requested serial port exists, raising an informative exception
# if it doesn't
ports = _get_available_ports()
if not len(ports):
raise RuntimeError(
"No serial ports available, please make sure your serial adapter is "
"plugged in!"
)
if port not in ports:
e = "Serial port '{0}' does not exist on this system, please try a "
e += "different one (available ports: ['{1}'])"
raise RuntimeError(e.format(port, "', '".join(ports)))
return port

def connect(self):
"""Initializes the connection to the stimulator.
Expand Down
12 changes: 10 additions & 2 deletions magneto/tests/test_magstim.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@

import pytest
from unittest import mock

import time
from queue import Queue
from magneto import Magstim
Expand All @@ -11,7 +13,9 @@

@pytest.fixture()
def mockstim():
tst = Magstim("COM1")
with mock.patch("magneto.magstim._get_available_ports") as get_ports:
get_ports.return_value = ['COM1']
tst = Magstim("COM1")
tst._from_stim = Queue()
tst._to_stim = Queue()
tst._com_thread = True
Expand All @@ -26,7 +30,11 @@ def add_to_queue(magstim, cmds):
class TestMagstim(object):

def test_init(self):
tst = Magstim("COM1")
with mock.patch("magneto.magstim._get_available_ports") as get_ports:
get_ports.return_value = ['COM1', 'COM4']
tst = Magstim("COM1")
with pytest.raises(RuntimeError):
tst = Magstim("COM3")

def test_pump(self, mockstim):
tst = mockstim
Expand Down
2 changes: 1 addition & 1 deletion magneto/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,4 @@ def _validate_response(resp):

def _get_available_ports():
# Returns a list of all available serial ports
return [p.device for p in comports()]
return [p.device for p in comports() if not "Bluetooth" in p.device]

0 comments on commit dfacddd

Please sign in to comment.