-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature Request] Level locking #262
Comments
I still need to work out how this will tie into the history system but this is the implementation so far. from __future__ import annotations
from functools import cached_property
from weakref import proxy, WeakValueDictionary
from threading import Condition, RLock, Lock
from contextlib import contextmanager
from collections import deque
from copy import deepcopy
from contextvars import ContextVar
from typing import Optional
from uuid import uuid4
# Should level objects be editable by default.
DefaultEditMode = True
# Can edit operations run in parallel.
ParallelEditing = False
# Is the history system enabled.
HistoryEnabled = True
class Chunk:
pass
class LockNotAcquired(RuntimeError):
pass
class TokenLock:
"""
A custom lock that allows all threads with the same token to run in parallel.
This is useful to support serial operations that can have parallel threads.
"""
def __init__(self):
self._lock = Lock()
self._condition = Condition(self._lock)
self._token: Optional[str] = None
self._count: int = 0
def _acquire_shared(self, token: str):
with self._lock:
while not (self._token is None or self._token == token):
# Wait until the lock is not locked or is locked with this token.
self._condition.wait()
self._token = token
self._count += 1
def _release_shared(self, token: str):
with self._lock:
if self._token == token:
self._count -= 1
if self._count <= 0:
self._token = None
self._count = 0
if not self._token:
# Wake up any threads waiting to acquire the lock
self._condition.notify_all()
@contextmanager
def lock(self, token: str = None):
"""
Acquire the lock with a context manager.
If the lock is not locked it will be locked with the given token.
If the lock is locked with the same token the thread will continue.
If the lock is locked with a different token this will block until the lock can be acquired with the given token.
:param token: The token to use. Defaults to a random UUID.
"""
if token is None:
token = str(uuid4())
self._acquire_shared(token)
try:
yield
finally:
self._release_shared(token)
class PrivateLevel:
"""Storage of private level data"""
def __init__(self):
self.editable_var = ContextVar("edit_mode", default=DefaultEditMode)
self.edit_lock = TokenLock()
@property
def editable(self) -> bool:
return self.editable_var.get()
class ChunkStorage:
def __init__(self, level: PrivateLevel):
# Weak pointer to the level to get raw and shared data
self._level: PrivateLevel = proxy(level)
# Mapping from chunk location to chunk object. Weakly stored so that we don't need to manually unload.
self._chunks = WeakValueDictionary[tuple[str, int, int], Chunk]()
# A deque to keep recently/frequently used chunks loaded
self._chunk_cache = deque[Chunk](maxlen=100)
# A lock per chunk
self._locks = WeakValueDictionary[tuple[str, int, int], RLock]()
# A lock that must be acquired before touching _locks
self._locks_lock = Lock()
def __get_lock(self, key: tuple[str, int, int]) -> RLock:
with self._locks_lock:
lock = self._locks.get(key)
if lock is None:
lock = self._locks[key] = RLock()
return lock
@contextmanager
def lock(self, dimension: str, cx: int, cz: int, *, blocking: bool = True, timeout: float = -1):
"""
Lock access to the chunk.
>>> with level.chunk.lock(dimension, cx, cz):
>>> # Do what you need to with the chunk
>>> # No other threads are able to edit or set the chunk while in this with block.
If you want to lock, get and set the chunk data :meth:`edit` is probably a better fit.
:param dimension: The dimension the chunk is stored in.
:param cx: The chunk x coordinate.
:param cz: The chunk z coordinate.
:param blocking: Should this block until the lock is acquired.
:param timeout: The amount of time to wait for the lock.
:raises:
LockNotAcquired: If the lock was not acquired.
"""
key = (dimension, cx, cz)
lock = self.__get_lock(key)
if not lock.acquire(blocking, timeout):
# Thread was not acquired
raise LockNotAcquired("Lock was not acquired.")
try:
yield
finally:
lock.release()
@contextmanager
def edit(self, dimension: str, cx: int, cz: int, blocking: bool = True, timeout: float = -1):
"""
Lock and edit a chunk.
>>> with level.chunk.edit(dimension, cx, cz) as chunk:
>>> # Edit the chunk data
>>> # No other threads are able to edit the chunk while in this with block.
>>> # When the with block exits the edited chunk will be automatically set if no exception occurred.
"""
if not self._level.editable:
raise RuntimeError("The level is not editable in this context.")
with self.lock(dimension, cx, cz, blocking=blocking, timeout=timeout):
chunk = self.get(dimension, cx, cz)
yield chunk
# If an exception occurs in user code, this line won't be run.
self.set(dimension, cx, cz, chunk)
def get(self, dimension: str, cx: int, cz: int) -> Chunk:
"""
Get a deep copy of the chunk data.
If you want to edit the chunk, use :meth:`edit` instead.
:param dimension: The dimension the chunk is stored in.
:param cx: The chunk x coordinate.
:param cz: The chunk z coordinate.
:return: A unique copy of the chunk data.
"""
return Chunk()
def set(self, dimension: str, cx: int, cz: int, chunk: Chunk):
"""
Overwrite the chunk data.
You must lock access to the chunk before setting it otherwise an exception may be raised.
If you want to edit the chunk, use :meth:`edit` instead.
:param dimension: The dimension the chunk is stored in.
:param cx: The chunk x coordinate.
:param cz: The chunk z coordinate.
:param chunk: The chunk data to set.
:raises:
LockNotAcquired: If the chunk is already locked by another thread.
"""
if not self._level.editable:
raise RuntimeError("The level is not editable in this context.")
key = (dimension, cx, cz)
lock = self.__get_lock(key)
if lock.acquire(False):
try:
chunk = deepcopy(chunk)
# TODO set the chunk and notify listeners
finally:
lock.release()
else:
raise LockNotAcquired("Cannot set a chunk if it is locked by another thread.")
def on_change(self, callback):
"""A notification system for chunk changes."""
raise NotImplementedError
class Level:
def __init__(self):
self._data = PrivateLevel()
@property
def editable(self) -> bool:
"""Is the level editable in this context."""
return self._data.editable
@contextmanager
def edit(self, transaction_token: str = None):
"""
Make the level editable in this context.
Depending on the configuration this may block until other operations have completed.
>>> with level.edit():
>>> # edit the level
>>> # Level is no longer editable
:param transaction_token: Optional UUID string. If an operation uses threads, using the same transaction_token will allow them to run in parallel.
"""
if ParallelEditing:
# Operations are configured to allow running in parallel.
# The token is not used in this context.
transaction_token = ""
with self._data.edit_lock.lock(transaction_token):
token = self._data.editable_var.set(True)
try:
yield
finally:
self._data.editable_var.reset(token)
@cached_property
def chunk(self) -> ChunkStorage:
return ChunkStorage(self._data) |
@Podshot raised the issue of deadlocking if two or more threads try to acquire locks that the other thread has already acquired. The only solution I can see is to implement a custom lock that checks for the deadlock condition and raises an exception. Here is my current implementation of a deadlock blocking RLock. from __future__ import annotations
from threading import Thread, RLock, Lock, Condition, get_ident
import time
from typing import Optional
from contextlib import contextmanager
_lock = Lock()
# Map from thread to the locks it has acquired and is waiting on.
_threads: dict[int, ThreadState] = {}
class DeadlockError(Exception):
pass
class ThreadState:
locked: set[SafeRLock] # The locks this thread has acquired
waiting: Optional[SafeRLock] # The lock this thread is waiting for. Might be None.
def __init__(self):
self.locked = set()
self.waiting = None
class SafeRLock:
"""RLock that raises a DeadlockError when acquiring if it would deadlock."""
_lock: RLock
_condition: Condition
_pending: set[int] # The threads waiting for this lock
_owner: Optional[int] # The thread that owns this lock
_lock_count: int # How many times the lock has been acquired by its owner
def __init__(self):
self._lock = RLock()
self._condition = Condition(_lock)
self._owner = None
self._pending = set()
self._lock_count = 0
def _would_deadlock(self) -> bool:
"""If this lock is acquired would it be a deadlock state"""
this_thread_id = get_ident()
owner_thread_id = self._owner
if owner_thread_id == this_thread_id:
# This lock is locked by this thread. Not a deadlock.
return False
lock = self
while True:
# Find the thread that owns the lock
owner_thread_id = lock._owner
if owner_thread_id is None:
# This lock is not locked. Not a deadlock.
return False
elif owner_thread_id == this_thread_id:
return True
# See which lock it is waiting for
lock = _threads[owner_thread_id].waiting
if lock is None:
# The thread is not waiting for a lock. Not a deadlock
return False
def acquire(self, blocking=True, timeout=-1):
with _lock:
# Try and acquire the lock without blocking
if self._lock.acquire(blocking=False):
# If this succeeds then the lock is not in use.
ident = get_ident()
self._owner = ident
self._lock_count += 1
_threads.setdefault(ident, ThreadState()).locked.add(self)
return True
if not blocking:
# We already tried to acquire without blocking and failed
return False
if self._would_deadlock():
raise DeadlockError("Acquiring this lock would lead to a deadlock.")
# Wait for the lock
ident = get_ident()
self._pending.add(ident)
thread_state = _threads.setdefault(ident, ThreadState())
thread_state.waiting = self
if self._condition.wait(timeout):
if not self._lock.acquire(blocking=False):
# Only one thread should be resumed so this shouldn't happen
raise RuntimeError
# Remove pending state
self._pending.remove(ident)
thread_state.waiting = None
# Add locked state
self._owner = ident
self._lock_count += 1
thread_state.locked.add(self)
return True
else:
# Timed out
return False
def release(self):
with _lock:
if self._owner != get_ident():
raise RuntimeError("Lock not owned by thread that tried releasing it.")
ident = get_ident()
lock_state = _threads[ident]
# Remove the locked state
lock_state.locked.remove(self)
if not lock_state.locked and lock_state.waiting is None:
del _threads[ident]
self._lock_count -= 1
if self._lock_count < 0:
raise RuntimeError
if self._lock_count == 0:
self._owner = None
self._lock.release()
self._condition.notify()
@contextmanager
def lock(self, blocking=True, timeout=-1):
locked = self.acquire(blocking, timeout)
try:
yield locked
finally:
if locked:
self.release()
import logging
logging.basicConfig(level=logging.INFO, format="%(levelname)s - %(threadName)s - %(message)s")
l1 = SafeRLock()
l2 = SafeRLock()
l3 = SafeRLock()
def op1():
logging.info("op1 getting l1")
with l1.lock():
logging.info("op1 got l1")
time.sleep(1)
logging.info("op1 getting l2")
with l2.lock():
logging.info("op1 getting l2")
time.sleep(1)
def op2():
logging.info("op2 getting l2")
with l2.lock():
logging.info("op2 got l2")
time.sleep(1)
logging.info("op2 getting l3")
with l3.lock():
logging.info("op2 got l3")
time.sleep(1)
def op3():
logging.info("op3 getting l3")
with l3.lock():
logging.info("op3 got l3")
time.sleep(1)
logging.info("op3 getting l1")
with l1.lock():
logging.info("op3 got l1")
time.sleep(1)
def main():
t1 = Thread(target=op1)
t2 = Thread(target=op2)
t3 = Thread(target=op3)
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
logging.info("finished")
if __name__ == '__main__':
main() |
Feature Request
The Problem
Some thought needs to be put into the history system and how threading will effect it.
The history system must work by starting a transaction, making changes and then ending the transaction.
All changes made as part of that transaction are part of that undo operation.
This is very simple if only one operation is running at once because all changes are from that one operation even if it supports threads.
The complexity is introduced if two operations are running in parallel.
The solutions I see are
We also need to support having no history system in which case the operations can happily run in parallel.
Feature Description
I suggest a global switch that allows switching between serial and parallel operations (if the operations support it) and another to enable or disable the history system.
We also need a lock to be able to stop operations if running in serial mode.
This will also need to allow multiple threads from a single operation to execute.
The text was updated successfully, but these errors were encountered: