SQLAlchemy Async error #246
Replies: 6 comments
-
You cannot use one session in multiple asyncio tasks |
Beta Was this translation helpful? Give feedback.
-
@Tishka17 yes i figured that out. Since I thought this is a common problem with async and dishka I thought that there would be a goto solution for this? |
Beta Was this translation helpful? Give feedback.
-
If you really need multiple sessions during one event processing you can change the scope for it. We have
|
Beta Was this translation helpful? Give feedback.
-
I am closing this issue. Feel free to open the new one if you have any proposals |
Beta Was this translation helpful? Give feedback.
-
Thanks. For example we do this now: @router.get('/get_organisations_feed')
@inject
async def get_feed(
feed_service: Annotated[FeedService, FromDishka()],
container: FromDishka[AsyncContainer],
):
organisation_ids = [o['id'] for o in [
{
"id": "eb1e0914-37c1-4748-b754-48d4cdc8bb61"
},
{
"id": "4965d7a6-0653-43a4-ac12-6149439d2683"
},
{
"id": "1580acf2-03db-40b0-9418-8f38a3b549b2"
},
{
"id": "db6d777a-430d-4194-9189-0da878d7ca08"
},
{
"id": "e1383451-08e0-481b-9482-db4c879008c8"
},
{
"id": "f1c06a44-16fc-4791-ba8d-f5a05b235538"
}
]
]
feed = await feed_service.get_organisations_feed(organisation_ids, container=container)
return {
} THen in the FeedService we pass the container: class FeedService:
def __init__(self,
feed_item_repository: FeedItemRepository,
nats_service: NATSService
) -> None:
self._feed_item_repository: FeedItemRepository = feed_item_repository
self.nats_service = nats_service
async def get_organisation_feed(self, organisation_id: str, page: int = None, size: int = 20) -> List[GetFeedItemResponse]:
"""
get organisation feed from NATS cache or database
"""
return await self._feed_item_repository.get_for_organisation(organisation_id, page, size, load_all=True)
async def get_organisations_feed(
self,
organisation_ids: List[str],
container: AsyncContainer,
page: int = None,
) -> List[FeedItem]:
tasks = []
for organisation_id in organisation_ids:
async with container() as action_container:
session = action_container.get(AsyncSession)
tasks.append(asyncio.create_task(self.get_organisation_feed(organisation_id)))
return await asyncio.gather(*tasks) And the FeedItemRepo is a subsclass of the BaseRepo: class BaseRepository(Generic[T]):
_model: T # TODO: find out if this is best solution
def __init__(self, session: AsyncSession):
self.session = session
class FeedItemRepository(BaseRepository[FeedItem]):
_model = FeedItem
async def get(self, feed_item: FeedItem) -> FeedItem:
q = select(self._model) \
.filter(self._model.organisation_id == feed_item.organisation_id) \
.filter(self._model.user_id == feed_item.user_id) \
.filter(self._model.event_id == feed_item.event_id) \
.filter(self._model.group_id == feed_item.group_id) \
.filter(self._model.message_id == feed_item.message_id) \
.filter(self._model.parent_message_id == feed_item.parent_message_id) \
.filter(self._model.type == feed_item.type) \
return (await self.session.execute(q)).scalar()
async def get_for_organisation(self,
organisation_id: str,
page: int = None,
size: int = None,
load_all: bool = False,
before: AwareDatetime | None = None,
after: AwareDatetime | None = None,
type: FeedItemType | None = None
) -> list[FeedItem]:
q = select(self._model).filter(self._model.organisation_id == organisation_id)
..... My code is not working as I am not sure how the |
Beta Was this translation helpful? Give feedback.
-
As I see, your FeedItemRepository and Session have scope REQUEST. My suggestion is to change the scope to ACTION and modify service code like this: class FeedService:
def __init__(self, nats_service: NATSService) -> None:
self.nats_service = nats_service
async def get_organisation_feed(self,[
container: AsyncContainer,
organisation_id: str, page: int = None, size: int = 20,
) -> List[GetFeedItemResponse]:
async with container() as container:
repo = action_container.get(FeedItemRepository)
return await feed_item_repository.get_for_organisation(organisation_id, page, size, load_all=True)
async def get_organisations_feed(
self,
organisation_ids: List[str],
container: AsyncContainer,
page: int = None,
) -> List[FeedItem]:
tasks = []
for organisation_id in organisation_ids:
tasks.append(asyncio.create_task(self.get_organisation_feed(container, organisation_id)))
return await asyncio.gather(*tasks) |
Beta Was this translation helpful? Give feedback.
-
Multiple Asynchronous sqlalchemy calls are not working properly with the following code, synchronous does.
I read that I need to have an
AsyncSession
per asyncio task. How could I instruct Dishka to handle this?Feed service:
Feed Item repo:
When I call the
FeedService.get_organisations_feed
method it complains about this following:Beta Was this translation helpful? Give feedback.
All reactions