Skip to content

Commit

Permalink
Clean up socket/handles/tempdirs for shm connections
Browse files Browse the repository at this point in the history
  • Loading branch information
sk1p committed Sep 16, 2024
1 parent 873949a commit 59af5ce
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 22 deletions.
9 changes: 6 additions & 3 deletions src/libertem_live/detectors/asi_tpx3/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from libertem.common.math import prod

from libertem_live.detectors.common import cleanup_handle_dir
from libertem_live.detectors.base.connection import (
PendingAcquisition, DetectorConnection,
AcquisitionProtocol,
Expand Down Expand Up @@ -66,20 +67,21 @@ def __init__(
self._bytes_per_chunk = bytes_per_chunk
self._huge_pages = huge_pages
self._chunks_per_stack = chunks_per_stack
self._shm_handle_path = None

self._conn = self._connect()

def _connect(self):
handle_path = self._make_handle_path()
self._shm_handle_path = self._make_handle_path()
uri = f"{self._data_host}:{self._data_port}"
logger.info(f"connecting to {uri} with shared memory handle {handle_path}")
logger.info(f"connecting to {uri} with shared memory handle {self._shm_handle_path}")
return ASITpx3Connection(
uri=uri,
chunks_per_stack=self._chunks_per_stack,
num_slots=self._num_slots,
bytes_per_chunk=self._bytes_per_chunk,
huge=self._huge_pages,
handle_path=handle_path,
handle_path=self._shm_handle_path,
)

def wait_for_acquisition(
Expand Down Expand Up @@ -113,6 +115,7 @@ def close(self):
self._conn.close()
self._passive_started = False
self._conn = None
cleanup_handle_dir(self._shm_handle_path)

def __enter__(self):
if self._conn is None:
Expand Down
13 changes: 13 additions & 0 deletions src/libertem_live/detectors/common.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import threading
import pathlib
import logging
from typing import Optional
import socket
Expand Down Expand Up @@ -158,3 +159,15 @@ def set_thread_name(name: str):
if prctl is None:
return
prctl.set_name(name)


def cleanup_handle_dir(path: Optional[str]):
if path is None:
return
path = pathlib.Path(path)
# the socket may not yet exist, even if we set up the temporary directory:
if path.exists():
path.unlink()
p_dir = path.parent
if p_dir.exists():
p_dir.rmdir()
6 changes: 5 additions & 1 deletion src/libertem_live/detectors/dectris/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import tempfile
from typing import Optional, TYPE_CHECKING

from libertem_live.detectors.common import cleanup_handle_dir
from libertem_live.detectors.base.connection import (
DetectorConnection,
)
Expand Down Expand Up @@ -77,15 +78,17 @@ def __init__(
num_slots = int(math.floor(buffer_size_bytes / (bytes_per_frame * frame_stack_size)))
self._num_slots = num_slots
self._conn: libertem_dectris.DectrisConnection = self._connect()
self._shm_handle_path = None

def _connect(self):
self._shm_handle_path = self._make_socket_path()
return libertem_dectris.DectrisConnection(
uri=f"tcp://{self._data_host}:{self._data_port}",
frame_stack_size=self._frame_stack_size,
num_slots=self._num_slots,
bytes_per_frame=self._bytes_per_frame,
huge=self._huge_pages,
handle_path=self._make_socket_path(),
handle_path=self._shm_handle_path,
)

def __enter__(self):
Expand Down Expand Up @@ -299,6 +302,7 @@ def close(self):
self._conn.close()
self._conn = None
self._passive_started = False
cleanup_handle_dir(self._shm_handle_path)

def reconnect(self):
if self._conn is not None:
Expand Down
6 changes: 5 additions & 1 deletion src/libertem_live/detectors/merlin/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from collections.abc import Generator
from contextlib import contextmanager

from libertem_live.detectors.common import cleanup_handle_dir
from libertem_live.detectors.base.connection import DetectorConnection, PendingAcquisition
from libertem_live.detectors.base.acquisition import AcquisitionProtocol

Expand Down Expand Up @@ -105,16 +106,18 @@ def __init__(
self._drain = drain
self._recovery_strategy = recovery_strategy
self._huge_pages = huge_pages
self._shm_handle_path = None
self.connect()

def connect(self):
if self._data_socket is not None:
return self._data_socket # already connected
self._shm_handle_path = self._make_socket_path()
self._data_socket = libertem_qd_mpx.QdConnection(
data_host=self._data_host,
data_port=self._data_port,
frame_stack_size=16, # FIXME! make configurable or determine automatically
shm_handle_path=self._make_socket_path(),
shm_handle_path=self._shm_handle_path,
drain=self._drain,
recovery_strategy=self._recovery_strategy,
huge=self._huge_pages,
Expand Down Expand Up @@ -187,6 +190,7 @@ def close(self):
if self._data_socket is not None:
self._data_socket.close()
self._data_socket = None
cleanup_handle_dir(self._shm_handle_path)

def get_conn_impl(self):
return self._data_socket # maybe remove `get_data_socket` function?
Expand Down
39 changes: 22 additions & 17 deletions tests/detectors/merlin/test_merlin.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from libertem_live.api import Hooks, LiveContext
from libertem_live.hooks import ReadyForDataEnv
from libertem_live.detectors.common import cleanup_handle_dir
from libertem_live.detectors.merlin import (
MerlinControl,
)
Expand Down Expand Up @@ -87,26 +88,30 @@ def test_small_shm(
import libertem_qd_mpx
# monkey-patch in a different underlying connection:
conn._data_socket.close()
conn._data_socket = libertem_qd_mpx.QdConnection(
data_host=host,
data_port=port,
frame_stack_size=1,
num_slots=4, # NOTE this ridiculously small value
shm_handle_path=conn._make_socket_path(),
drain=False,
)
conn._data_socket.start_passive()
shm_socket_path = conn._make_socket_path()
try:
conn._data_socket = libertem_qd_mpx.QdConnection(
data_host=host,
data_port=port,
frame_stack_size=1,
num_slots=4, # NOTE this ridiculously small value
shm_handle_path=shm_socket_path,
drain=False,
)
conn._data_socket.start_passive()

aq = ctx_pipelined.make_acquisition(
conn=conn,
nav_shape=(32, 32),
)
udf = SumUDF()
aq = ctx_pipelined.make_acquisition(
conn=conn,
nav_shape=(32, 32),
)
udf = SumUDF()

res = ctx_pipelined.run_udf(dataset=aq, udf=udf)
ref = ctx_pipelined.run_udf(dataset=merlin_ds, udf=udf)
res = ctx_pipelined.run_udf(dataset=aq, udf=udf)
ref = ctx_pipelined.run_udf(dataset=merlin_ds, udf=udf)

assert_allclose(res['intensity'], ref['intensity'])
assert_allclose(res['intensity'], ref['intensity'])
finally:
cleanup_handle_dir(shm_socket_path)


def test_passive_acquisition(
Expand Down

0 comments on commit 59af5ce

Please sign in to comment.