-
Notifications
You must be signed in to change notification settings - Fork 3.4k
/
local_process.py
232 lines (187 loc) · 9.63 KB
/
local_process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import contextlib
import logging
import uuid
from queue import Queue
from typing import TYPE_CHECKING, Any
from pydantic import Field
from semantic_kernel.exceptions import KernelException
from semantic_kernel.exceptions.process_exceptions import ProcessEventUndefinedException
from semantic_kernel.kernel import Kernel
from semantic_kernel.processes.const import END_PROCESS_ID
from semantic_kernel.processes.kernel_process.kernel_process_state import KernelProcessState
from semantic_kernel.processes.kernel_process.kernel_process_step_info import KernelProcessStepInfo
from semantic_kernel.processes.local_runtime.local_event import (
KernelProcessEvent,
KernelProcessEventVisibility,
LocalEvent,
)
from semantic_kernel.processes.local_runtime.local_message import LocalMessage
from semantic_kernel.processes.local_runtime.local_message_factory import LocalMessageFactory
from semantic_kernel.processes.local_runtime.local_step import LocalStep
from semantic_kernel.utils.experimental_decorator import experimental_class
if TYPE_CHECKING:
from semantic_kernel.processes.kernel_process.kernel_process import KernelProcess
logger: logging.Logger = logging.getLogger(__name__)
@experimental_class
class LocalProcess(LocalStep):
"""A local process that contains a collection of steps."""
kernel: Kernel
steps: list[LocalStep] = Field(default_factory=list)
step_infos: list[KernelProcessStepInfo] = Field(default_factory=list)
process: "KernelProcess"
initialize_task: bool | None = False
external_event_queue: Queue = Field(default_factory=Queue)
process_task: asyncio.Task | None = None
def __init__(self, process: "KernelProcess", kernel: Kernel, parent_process_id: str | None = None):
"""Initializes the local process."""
args: dict[str, Any] = {
"step_info": process,
"kernel": kernel,
"parent_process_id": parent_process_id,
"step_infos": list(process.steps),
"process": process,
"initialize_task": False,
}
super().__init__(**args)
def ensure_initialized(self):
"""Ensures the process is initialized lazily (synchronously)."""
if not self.initialize_task:
self.initialize_process()
self.initialize_task = True
async def start(self, keep_alive: bool = True):
"""Starts the process with an initial event."""
self.ensure_initialized()
self.process_task = asyncio.create_task(self.internal_execute(keep_alive=keep_alive))
async def run_once(self, process_event: KernelProcessEvent):
"""Starts the process with an initial event and waits for it to finish."""
if process_event is None:
raise ProcessEventUndefinedException("The process event must be specified.")
self.external_event_queue.put(process_event)
await self.start(keep_alive=False)
if self.process_task:
await self.process_task
async def stop(self):
"""Stops a running process."""
if not self.process_task or self.process_task.done():
return # Task is already finished or hasn't started
self.process_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self.process_task
async def initialize_step(self):
"""Initializes the step."""
# The process does not need any further initialization
pass
async def send_message(self, process_event: KernelProcessEvent):
"""Sends a message to the process."""
if process_event is None:
raise ProcessEventUndefinedException("The process event must be specified.")
self.external_event_queue.put(process_event)
async def get_process_info(self) -> "KernelProcess":
"""Gets the process information."""
return await self.to_kernel_process()
def initialize_process(self):
"""Initializes the input and output edges for the process and initializes the steps."""
from semantic_kernel.processes.kernel_process.kernel_process import KernelProcess
# Initialize the input and output edges for the process
self.output_edges = {kvp[0]: list(kvp[1]) for kvp in self.process.edges.items()}
# Initialize the steps within this process
for step in self.step_infos:
local_step = None
# The current step should already have a name.
assert step.state and step.state.name is not None # nosec
if isinstance(step, KernelProcess):
# The process will only have an Id if it's already been executed.
if not step.state.id:
step.state.id = str(uuid.uuid4().hex)
# Create a new LocalProcess for the step
process = LocalProcess(
process=step,
kernel=self.kernel,
parent_process_id=self.id,
)
local_step = process
else:
# The current step should already have an Id.
assert step.state and step.state.id is not None # nosec
# Create a LocalStep for the step
local_step = LocalStep(
step_info=step,
kernel=self.kernel,
parent_process_id=self.id,
)
# Add the local step to the list of steps
self.steps.append(local_step)
async def handle_message(self, message: LocalMessage):
"""Handles a LocalMessage that has been sent to the process."""
if not message.target_event_id:
error_message = (
"Internal Process Error: The target event id must be specified when sending a message to a step."
)
logger.error(error_message)
raise KernelException(error_message)
event_id = message.target_event_id
if event_id in self.output_edges:
nested_event = KernelProcessEvent(
id=event_id, data=message.target_event_data, visibility=KernelProcessEventVisibility.Internal
)
await self.run_once(nested_event)
async def internal_execute(self, max_supersteps: int = 100, keep_alive: bool = True):
"""Internal execution logic for the process."""
message_channel: Queue[LocalMessage] = Queue()
try:
for _ in range(max_supersteps):
self.enqueue_external_messages(message_channel)
for step in self.steps:
await self.enqueue_step_messages(step, message_channel)
messages_to_process: list[LocalMessage] = []
while not message_channel.empty():
messages_to_process.append(message_channel.get())
if not messages_to_process and (not keep_alive or self.external_event_queue.empty()):
break
message_tasks = []
for message in messages_to_process:
if message.destination_id == END_PROCESS_ID:
break
destination_step = next(step for step in self.steps if step.id == message.destination_id)
message_tasks.append(destination_step.handle_message(message))
await asyncio.gather(*message_tasks)
except Exception as ex:
logger.error(f"An error occurred while running the process: {ex}.")
raise
async def to_kernel_process(self) -> "KernelProcess":
"""Builds a KernelProcess from the current LocalProcess."""
from semantic_kernel.processes.kernel_process.kernel_process import KernelProcess
process_state = KernelProcessState(self.name, self.id)
step_tasks = [step.to_kernel_process_step_info() for step in self.steps]
steps = await asyncio.gather(*step_tasks)
return KernelProcess(state=process_state, steps=steps, edges=self.output_edges)
async def to_kernel_process_step_info(self) -> "KernelProcessStepInfo":
"""Extracts the current state of the step and returns it as a KernelProcessStepInfo."""
return await self.to_kernel_process()
def enqueue_external_messages(self, message_channel: Queue[LocalMessage]):
"""Processes external events that have been sent to the process."""
while not self.external_event_queue.empty():
external_event: KernelProcessEvent = self.external_event_queue.get_nowait()
if external_event.id in self.output_edges:
edges = self.output_edges[external_event.id]
for edge in edges:
message = LocalMessageFactory.create_from_edge(edge, external_event.data)
message_channel.put(message)
async def enqueue_step_messages(self, step: LocalStep, message_channel: Queue[LocalMessage]):
"""Processes events emitted by the given step and enqueues them."""
all_step_events = step.get_all_events()
for step_event in all_step_events:
if step_event.visibility == KernelProcessEventVisibility.Public:
if isinstance(step_event, KernelProcessEvent):
await self.emit_event(step_event) # type: ignore
elif isinstance(step_event, LocalEvent):
await self.emit_local_event(step_event) # type: ignore
for edge in step.get_edge_for_event(step_event.id):
message = LocalMessageFactory.create_from_edge(edge, step_event.data)
message_channel.put(message)
def dispose(self):
"""Clean up resources."""
if self.process_task:
self.process_task.cancel()