-
Notifications
You must be signed in to change notification settings - Fork 3.4k
/
assistant_content_generation.py
388 lines (330 loc) · 14.6 KB
/
assistant_content_generation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
# Copyright (c) Microsoft. All rights reserved.
from typing import TYPE_CHECKING, Any
from openai import AsyncOpenAI
from openai.types.beta.threads.file_citation_delta_annotation import FileCitationDeltaAnnotation
from openai.types.beta.threads.file_path_delta_annotation import FilePathDeltaAnnotation
from openai.types.beta.threads.image_file_content_block import ImageFileContentBlock
from openai.types.beta.threads.image_file_delta_block import ImageFileDeltaBlock
from openai.types.beta.threads.message_delta_event import MessageDeltaEvent
from openai.types.beta.threads.runs import CodeInterpreterLogs
from openai.types.beta.threads.runs.code_interpreter_tool_call import CodeInterpreter
from openai.types.beta.threads.text_content_block import TextContentBlock
from openai.types.beta.threads.text_delta_block import TextDeltaBlock
from semantic_kernel.contents.annotation_content import AnnotationContent
from semantic_kernel.contents.chat_message_content import ChatMessageContent
from semantic_kernel.contents.file_reference_content import FileReferenceContent
from semantic_kernel.contents.function_call_content import FunctionCallContent
from semantic_kernel.contents.function_result_content import FunctionResultContent
from semantic_kernel.contents.image_content import ImageContent
from semantic_kernel.contents.streaming_annotation_content import StreamingAnnotationContent
from semantic_kernel.contents.streaming_chat_message_content import StreamingChatMessageContent
from semantic_kernel.contents.streaming_file_reference_content import StreamingFileReferenceContent
from semantic_kernel.contents.streaming_text_content import StreamingTextContent
from semantic_kernel.contents.text_content import TextContent
from semantic_kernel.contents.utils.author_role import AuthorRole
from semantic_kernel.exceptions.agent_exceptions import AgentExecutionException
from semantic_kernel.utils.experimental_decorator import experimental_function
if TYPE_CHECKING:
from openai.resources.beta.threads.messages import Message
from openai.resources.beta.threads.runs.runs import Run
from openai.types.beta.threads.annotation import Annotation
from openai.types.beta.threads.runs import RunStep
from openai.types.beta.threads.runs.tool_call import ToolCall
from openai.types.beta.threads.runs.tool_calls_step_details import ToolCallsStepDetails
###################################################################
# The methods in this file are used with OpenAIAssistantAgent #
# related code. They are used to create chat messages, or #
# generate message content. #
###################################################################
@experimental_function
async def create_chat_message(
client: AsyncOpenAI,
thread_id: str,
message: "ChatMessageContent",
allowed_message_roles: list[str] = [AuthorRole.USER, AuthorRole.ASSISTANT],
) -> "Message":
"""Class method to add a chat message, callable from class or instance.
Args:
client: The client to use for creating the message.
thread_id: The thread id.
message: The chat message.
allowed_message_roles: The allowed message roles.
Returns:
Message: The message.
"""
if message.role.value not in allowed_message_roles and message.role != AuthorRole.TOOL:
raise AgentExecutionException(
f"Invalid message role `{message.role.value}`. Allowed roles are {allowed_message_roles}."
)
message_contents: list[dict[str, Any]] = get_message_contents(message=message)
return await client.beta.threads.messages.create(
thread_id=thread_id,
role="assistant" if message.role == AuthorRole.TOOL else message.role.value, # type: ignore
content=message_contents, # type: ignore
)
@experimental_function
def get_message_contents(message: "ChatMessageContent") -> list[dict[str, Any]]:
"""Get the message contents.
Args:
message: The message.
"""
contents: list[dict[str, Any]] = []
for content in message.items:
if isinstance(content, TextContent):
contents.append({"type": "text", "text": content.text})
elif isinstance(content, ImageContent) and content.uri:
contents.append(content.to_dict())
elif isinstance(content, FileReferenceContent):
contents.append({
"type": "image_file",
"image_file": {"file_id": content.file_id},
})
elif isinstance(content, FunctionResultContent):
contents.append({"type": "text", "text": content.result})
return contents
@experimental_function
def generate_message_content(
assistant_name: str, message: "Message", completed_step: "RunStep | None" = None
) -> ChatMessageContent:
"""Generate message content."""
role = AuthorRole(message.role)
metadata = (
{
"created_at": completed_step.created_at,
"message_id": message.id, # message needs to be defined in context
"step_id": completed_step.id,
"run_id": completed_step.run_id,
"thread_id": completed_step.thread_id,
"assistant_id": completed_step.assistant_id,
"usage": completed_step.usage,
}
if completed_step is not None
else None
)
content: ChatMessageContent = ChatMessageContent(role=role, name=assistant_name, metadata=metadata) # type: ignore
for item_content in message.content:
if item_content.type == "text":
assert isinstance(item_content, TextContentBlock) # nosec
content.items.append(
TextContent(
text=item_content.text.value,
)
)
for annotation in item_content.text.annotations:
content.items.append(generate_annotation_content(annotation))
elif item_content.type == "image_file":
assert isinstance(item_content, ImageFileContentBlock) # nosec
content.items.append(
FileReferenceContent(
file_id=item_content.image_file.file_id,
)
)
return content
@experimental_function
def generate_streaming_message_content(
assistant_name: str, message_delta_event: "MessageDeltaEvent"
) -> StreamingChatMessageContent:
"""Generate streaming message content from a MessageDeltaEvent."""
delta = message_delta_event.delta
# Determine the role
role = AuthorRole(delta.role) if delta.role is not None else AuthorRole("assistant")
items: list[StreamingTextContent | StreamingAnnotationContent | StreamingFileReferenceContent] = []
# Process each content block in the delta
for delta_block in delta.content or []:
if delta_block.type == "text":
assert isinstance(delta_block, TextDeltaBlock) # nosec
if delta_block.text and delta_block.text.value: # Ensure text is not None
text_value = delta_block.text.value
items.append(
StreamingTextContent(
text=text_value,
choice_index=delta_block.index,
)
)
# Process annotations if any
if delta_block.text.annotations:
for annotation in delta_block.text.annotations or []:
if isinstance(annotation, (FileCitationDeltaAnnotation, FilePathDeltaAnnotation)):
items.append(generate_streaming_annotation_content(annotation))
elif delta_block.type == "image_file":
assert isinstance(delta_block, ImageFileDeltaBlock) # nosec
if delta_block.image_file and delta_block.image_file.file_id:
file_id = delta_block.image_file.file_id
items.append(
StreamingFileReferenceContent(
file_id=file_id,
)
)
return StreamingChatMessageContent(role=role, name=assistant_name, items=items, choice_index=0) # type: ignore
@experimental_function
def generate_function_call_content(agent_name: str, fccs: list[FunctionCallContent]) -> ChatMessageContent:
"""Generate function call content.
Args:
agent_name: The agent name.
fccs: The function call contents.
Returns:
ChatMessageContent: The chat message content containing the function call content as the items.
"""
return ChatMessageContent(role=AuthorRole.TOOL, name=agent_name, items=fccs) # type: ignore
@experimental_function
def generate_function_result_content(
agent_name: str, function_step: FunctionCallContent, tool_call: "ToolCall"
) -> ChatMessageContent:
"""Generate function result content."""
function_call_content: ChatMessageContent = ChatMessageContent(role=AuthorRole.TOOL, name=agent_name) # type: ignore
function_call_content.items.append(
FunctionResultContent(
function_name=function_step.function_name,
plugin_name=function_step.plugin_name,
id=function_step.id,
result=tool_call.function.output, # type: ignore
)
)
return function_call_content
@experimental_function
def get_function_call_contents(run: "Run", function_steps: dict[str, FunctionCallContent]) -> list[FunctionCallContent]:
"""Extract function call contents from the run.
Args:
run: The run.
function_steps: The function steps
Returns:
The list of function call contents.
"""
function_call_contents: list[FunctionCallContent] = []
required_action = getattr(run, "required_action", None)
if not required_action or not getattr(required_action, "submit_tool_outputs", False):
return function_call_contents
for tool in required_action.submit_tool_outputs.tool_calls:
fcc = FunctionCallContent(
id=tool.id,
index=getattr(tool, "index", None),
name=tool.function.name,
arguments=tool.function.arguments,
)
function_call_contents.append(fcc)
function_steps[tool.id] = fcc
return function_call_contents
@experimental_function
def generate_code_interpreter_content(agent_name: str, code: str) -> "ChatMessageContent":
"""Generate code interpreter content.
Args:
agent_name: The agent name.
code: The code.
Returns:
ChatMessageContent: The chat message content.
"""
return ChatMessageContent(
role=AuthorRole.ASSISTANT,
content=code,
name=agent_name,
metadata={"code": True},
)
@experimental_function
def generate_streaming_function_content(
agent_name: str, step_details: "ToolCallsStepDetails"
) -> "StreamingChatMessageContent":
"""Generate streaming function content.
Args:
agent_name: The agent name.
step_details: The function step.
Returns:
StreamingChatMessageContent: The chat message content.
"""
items: list[FunctionCallContent] = []
for tool in step_details.tool_calls:
if tool.type == "function":
items.append(
FunctionCallContent(
id=tool.id,
index=getattr(tool, "index", None),
name=tool.function.name,
arguments=tool.function.arguments,
)
)
return (
StreamingChatMessageContent(
role=AuthorRole.ASSISTANT,
name=agent_name,
items=items, # type: ignore
choice_index=0,
)
if len(items) > 0
else None
)
@experimental_function
def generate_streaming_code_interpreter_content(
agent_name: str, step_details: "ToolCallsStepDetails"
) -> "StreamingChatMessageContent | None":
"""Generate code interpreter content.
Args:
agent_name: The agent name.
step_details: The current step details.
Returns:
StreamingChatMessageContent: The chat message content.
"""
items: list[StreamingTextContent | StreamingFileReferenceContent] = []
metadata: dict[str, bool] = {}
for index, tool in enumerate(step_details.tool_calls):
if tool.type == "code_interpreter":
if tool.code_interpreter.input:
items.append(
StreamingTextContent(
choice_index=index,
text=tool.code_interpreter.input,
)
)
metadata["code"] = True
if tool.code_interpreter.outputs:
for output in tool.code_interpreter.outputs:
if isinstance(output, CodeInterpreter) and output.image.file_id:
items.append(
StreamingFileReferenceContent(
file_id=output.image.file_id,
)
)
if isinstance(output, CodeInterpreterLogs) and output.logs:
items.append(
StreamingTextContent(
choice_index=index,
text=output.logs,
)
)
return (
StreamingChatMessageContent(
role=AuthorRole.ASSISTANT,
name=agent_name,
items=items, # type: ignore
choice_index=0,
metadata=metadata if metadata else None,
)
if len(items) > 0
else None
)
@experimental_function
def generate_annotation_content(annotation: "Annotation") -> AnnotationContent:
"""Generate annotation content."""
file_id = None
if hasattr(annotation, "file_path"):
file_id = annotation.file_path.file_id
elif hasattr(annotation, "file_citation"):
file_id = annotation.file_citation.file_id
return AnnotationContent(
file_id=file_id,
quote=annotation.text,
start_index=annotation.start_index,
end_index=annotation.end_index,
)
@experimental_function
def generate_streaming_annotation_content(annotation: "Annotation") -> StreamingAnnotationContent:
"""Generate streaming annotation content."""
file_id = None
if hasattr(annotation, "file_path") and annotation.file_path:
file_id = annotation.file_path.file_id if annotation.file_path.file_id else None
elif hasattr(annotation, "file_citation") and annotation.file_citation:
file_id = annotation.file_citation.file_id if annotation.file_citation.file_id else None
return StreamingAnnotationContent(
file_id=file_id,
quote=annotation.text,
start_index=annotation.start_index,
end_index=annotation.end_index,
)