Skip to content

Commit

Permalink
Nasty workaround for a bug in aioredis locking
Browse files Browse the repository at this point in the history
  • Loading branch information
codado-nl committed Jun 30, 2022
1 parent 6a7e3ff commit 2eb6b19
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 7 deletions.
3 changes: 3 additions & 0 deletions orchestrator/distlock/distlock_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,6 @@ async def get_lock(self, resource: str, expiration_seconds: int) -> Optional[Dis

async def release_lock(self, resource: DistLock) -> None:
await self._backend.release_lock(resource) # type: ignore

def release_sync(self, resource: DistLock) -> None:
self._backend.release_sync(resource) # type: ignore
4 changes: 4 additions & 0 deletions orchestrator/distlock/managers/memory_distlock_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
from threading import Lock, Thread
from time import sleep, time
from typing import Dict, Optional, Tuple
Expand Down Expand Up @@ -63,6 +64,9 @@ async def release_lock(self, lock: Lock) -> None:
del self.locks[name]
logger.debug("Successfully unlocked resource", resource=name)

def release_sync(self, lock: Lock) -> None:
asyncio.run(self.release_lock(lock))

def run(self) -> None:
while True:
with self.manager_lock:
Expand Down
23 changes: 23 additions & 0 deletions orchestrator/distlock/managers/redis_distlock_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
# limitations under the License.
from typing import Optional, Tuple

from redis import Redis
from redis.asyncio import Redis as AIORedis
from redis.asyncio.lock import Lock
from redis.exceptions import LockError
from redis.lock import Lock as SyncLock
from structlog import get_logger

logger = get_logger(__name__)
Expand Down Expand Up @@ -50,6 +52,7 @@ async def get_lock(self, resource: str, expiration_seconds: int) -> Optional[Loc
name=key,
timeout=float(expiration_seconds),
blocking=False,
thread_local=False,
)
if await lock.acquire():
return lock
Expand All @@ -70,6 +73,26 @@ async def release_lock(self, lock: Lock) -> None:
except LockError:
logger.Exception("Could not release lock for resource", resource=lock.name)

# https://github.com/aio-libs/aioredis-py/issues/1273
def release_sync(self, lock: Lock) -> None:
redis_conn: Optional[Redis] = None
try:
redis_conn = Redis(host=self.redis_address[0], port=self.redis_address[1])
sync_lock: SyncLock = SyncLock(
redis=redis_conn,
name=lock.name,
timeout=lock.timeout,
blocking=False,
thread_local=False,
)
sync_lock.local = lock.local
sync_lock.release()
except LockError:
logger.Exception("Could not release lock for resource", resource=lock.name)
finally:
if redis_conn:
redis_conn.close()


__all__ = [
"RedisDistLockManager",
Expand Down
8 changes: 1 addition & 7 deletions orchestrator/services/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from http import HTTPStatus
Expand Down Expand Up @@ -417,12 +416,7 @@ def run() -> None:
logger.exception("Failed to resume process", pid=_proc.pid)
logger.info("Completed resuming processes")
finally:
loop = asyncio.new_event_loop()
loop.run_until_complete(distlock_manager.release_lock(lock)) # type: ignore
try:
loop.close()
except Exception: # noqa: S110
pass
distlock_manager.release_sync(lock) # type: ignore

workflow_executor = get_thread_pool()

Expand Down

0 comments on commit 2eb6b19

Please sign in to comment.