-
Notifications
You must be signed in to change notification settings - Fork 3.4k
/
local_kernel_process_context.py
67 lines (49 loc) · 2.46 KB
/
local_kernel_process_context.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# Copyright (c) Microsoft. All rights reserved.
from typing import TYPE_CHECKING
from semantic_kernel.kernel import Kernel
from semantic_kernel.kernel_pydantic import KernelBaseModel
from semantic_kernel.processes.local_runtime.local_process import LocalProcess
from semantic_kernel.utils.experimental_decorator import experimental_class
if TYPE_CHECKING:
from semantic_kernel.processes.kernel_process.kernel_process import KernelProcess
from semantic_kernel.processes.local_runtime.local_event import KernelProcessEvent
@experimental_class
class LocalKernelProcessContext(KernelBaseModel):
"""A local kernel process context."""
local_process: LocalProcess
def __init__(self, process: "KernelProcess", kernel: "Kernel"):
"""Initializes the local kernel process context."""
from semantic_kernel.processes.kernel_process.kernel_process import KernelProcess # noqa: F401
LocalProcess.model_rebuild()
if not process or not process.state or not process.state.name.strip():
raise ValueError("Process and process state must be provided and have a valid name")
if not kernel:
raise ValueError("Kernel must be provided")
local_process = LocalProcess(
process=process,
kernel=kernel,
parent_process_id=None,
)
super().__init__(local_process=local_process)
async def start_with_event(self, initial_event: "KernelProcessEvent") -> None:
"""Starts the local process with an initial event."""
await self.local_process.run_once(initial_event)
async def send_event(self, process_event: "KernelProcessEvent") -> None:
"""Sends an event to the process."""
await self.local_process.send_message(process_event)
async def stop(self) -> None:
"""Stops the local process."""
await self.local_process.stop()
async def get_state(self) -> "KernelProcess":
"""Gets the current state of the process."""
return await self.local_process.get_process_info()
async def __aenter__(self):
"""Enters the async context (used for resource management)."""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Exits the async context and disposes of resources."""
await self.dispose()
async def dispose(self) -> None:
"""Disposes of the resources used by the process."""
if self.local_process:
self.local_process.dispose()