Skip to content

Commit

Permalink
feat: add AssemblyAI components (langflow-ai#3829)
Browse files Browse the repository at this point in the history
* Add AssemblyAI components

* add icons

* [autofix.ci] apply automated fixes

* Add ruff fixes

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>
  • Loading branch information
3 people authored and diogocabral committed Nov 26, 2024
1 parent ded7b4d commit 1ac743f
Show file tree
Hide file tree
Showing 14 changed files with 1,419 additions and 2,033 deletions.
508 changes: 267 additions & 241 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ langchain-cohere = "^0.1.5"
elasticsearch = "^8.12.0"
pytube = "^15.0.0"
dspy-ai = "^2.4.0"
assemblyai = "^0.26.0"
assemblyai = "^0.33.0"
litellm = "^1.44.0"
chromadb = "^0.4"
langchain-anthropic = "^0.1.23"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import datetime
from typing import Dict, List

from langflow.custom import Component
from langflow.io import DataInput, Output
from langflow.schema import Data


class AssemblyAITranscriptionParser(Component):
display_name = "AssemblyAI Parse Transcript"
description = "Parse AssemblyAI transcription result. If Speaker Labels was enabled, format utterances with speakers and timestamps"
documentation = "https://www.assemblyai.com/docs"
icon = "AssemblyAI"

inputs = [
DataInput(
name="transcription_result",
display_name="Transcription Result",
info="The transcription result from AssemblyAI",
),
]

outputs = [
Output(display_name="Parsed Transcription", name="parsed_transcription", method="parse_transcription"),
]

def parse_transcription(self) -> Data:
# check if it's an error message from the previous step
if self.transcription_result.data.get("error"):
self.status = self.transcription_result.data["error"]
return self.transcription_result

try:
transcription_data = self.transcription_result.data

if transcription_data.get("utterances"):
# If speaker diarization was enabled
parsed_result = self.parse_with_speakers(transcription_data["utterances"])
elif transcription_data.get("text"):
# If speaker diarization was not enabled
parsed_result = transcription_data["text"]
else:
raise ValueError("Unexpected transcription format")

self.status = parsed_result
return Data(data={"text": parsed_result})
except Exception as e:
error_message = f"Error parsing transcription: {str(e)}"
self.status = error_message
return Data(data={"error": error_message})

def parse_with_speakers(self, utterances: List[Dict]) -> str:
parsed_result = []
for utterance in utterances:
speaker = utterance["speaker"]
start_time = self.format_timestamp(utterance["start"])
text = utterance["text"]
parsed_result.append(f'Speaker {speaker} {start_time}\n"{text}"\n')

return "\n".join(parsed_result)

def format_timestamp(self, milliseconds: int) -> str:
return str(datetime.timedelta(milliseconds=milliseconds)).split(".")[0]
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import assemblyai as aai

from langflow.custom import Component
from langflow.io import DataInput, DropdownInput, IntInput, Output, SecretStrInput
from langflow.schema import Data


class AssemblyAIGetSubtitles(Component):
display_name = "AssemblyAI Get Subtitles"
description = "Export your transcript in SRT or VTT format for subtitles and closed captions"
documentation = "https://www.assemblyai.com/docs"
icon = "AssemblyAI"

inputs = [
SecretStrInput(
name="api_key",
display_name="Assembly API Key",
info="Your AssemblyAI API key. You can get one from https://www.assemblyai.com/",
),
DataInput(
name="transcription_result",
display_name="Transcription Result",
info="The transcription result from AssemblyAI",
),
DropdownInput(
name="subtitle_format",
display_name="Subtitle Format",
options=["srt", "vtt"],
value="srt",
info="The format of the captions (SRT or VTT)",
),
IntInput(
name="chars_per_caption",
display_name="Characters per Caption",
info="The maximum number of characters per caption (0 for no limit)",
value=0,
advanced=True,
),
]

outputs = [
Output(display_name="Subtitles", name="subtitles", method="get_subtitles"),
]

def get_subtitles(self) -> Data:
aai.settings.api_key = self.api_key

# check if it's an error message from the previous step
if self.transcription_result.data.get("error"):
self.status = self.transcription_result.data["error"]
return self.transcription_result

try:
transcript_id = self.transcription_result.data["id"]
transcript = aai.Transcript.get_by_id(transcript_id)
except Exception as e:
error = f"Getting transcription failed: {str(e)}"
self.status = error
return Data(data={"error": error})

if transcript.status == aai.TranscriptStatus.completed:
subtitles = None
chars_per_caption = self.chars_per_caption if self.chars_per_caption > 0 else None
if self.subtitle_format == "srt":
subtitles = transcript.export_subtitles_srt(chars_per_caption)
else:
subtitles = transcript.export_subtitles_vtt(chars_per_caption)

result = Data(
subtitles=subtitles,
format=self.subtitle_format,
transcript_id=transcript_id,
chars_per_caption=chars_per_caption,
)

self.status = result
return result
else:
self.status = transcript.error
return Data(data={"error": transcript.error})
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import assemblyai as aai

from langflow.custom import Component
from langflow.io import DataInput, DropdownInput, FloatInput, IntInput, MessageInput, Output, SecretStrInput
from langflow.schema import Data


class AssemblyAILeMUR(Component):
display_name = "AssemblyAI LeMUR"
description = "Apply Large Language Models to spoken data using the AssemblyAI LeMUR framework"
documentation = "https://www.assemblyai.com/docs/lemur"
icon = "AssemblyAI"

inputs = [
SecretStrInput(
name="api_key",
display_name="Assembly API Key",
info="Your AssemblyAI API key. You can get one from https://www.assemblyai.com/",
advanced=False,
),
DataInput(
name="transcription_result",
display_name="Transcription Result",
info="The transcription result from AssemblyAI",
),
MessageInput(
name="prompt",
display_name="Input Prompt",
info="The text to prompt the model",
),
DropdownInput(
name="final_model",
display_name="Final Model",
options=["claude3_5_sonnet", "claude3_opus", "claude3_haiku", "claude3_sonnet"],
value="claude3_5_sonnet",
info="The model that is used for the final prompt after compression is performed",
),
FloatInput(
name="temperature",
display_name="Temperature",
advanced=True,
value=0.0,
info="The temperature to use for the model",
),
IntInput(
name="max_output_size",
display_name=" Max Output Size",
advanced=True,
value=2000,
info="Max output size in tokens, up to 4000",
),
]

outputs = [
Output(display_name="LeMUR Response", name="lemur_response", method="run_lemur"),
]

def run_lemur(self) -> Data:
"""Use the LeMUR task endpoint to input the LLM prompt."""
aai.settings.api_key = self.api_key

# check if it's an error message from the previous step
if self.transcription_result.data.get("error"):
self.status = self.transcription_result.data["error"]
return self.transcription_result

if not self.prompt or not self.prompt.text:
self.status = "No prompt specified"
return Data(data={"error": "No prompt specified"})

try:
transcript = aai.Transcript.get_by_id(self.transcription_result.data["id"])
except Exception as e:
error = f"Getting transcription failed: {str(e)}"
self.status = error
return Data(data={"error": error})

if transcript.status == aai.TranscriptStatus.completed:
try:
result = transcript.lemur.task(
prompt=self.prompt.text,
final_model=self.get_final_model(self.final_model),
temperature=self.temperature,
max_output_size=self.max_output_size,
)

result = Data(data=result.dict())
self.status = result
return result
except Exception as e:
error = f"An Exception happened while calling LeMUR: {str(e)}"
self.status = error
return Data(data={"error": error})
else:
self.status = transcript.error
return Data(data={"error": transcript.error})

def get_final_model(self, model_name: str) -> aai.LemurModel:
if model_name == "claude3_5_sonnet":
return aai.LemurModel.claude3_5_sonnet
elif model_name == "claude3_opus":
return aai.LemurModel.claude3_opus
elif model_name == "claude3_haiku":
return aai.LemurModel.claude3_haiku
elif model_name == "claude3_sonnet":
return aai.LemurModel.claude3_sonnet
else:
raise ValueError(f"Model name not supported: {model_name}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
from typing import List

import assemblyai as aai

from langflow.custom import Component
from langflow.io import BoolInput, DropdownInput, IntInput, MessageTextInput, Output, SecretStrInput
from langflow.schema import Data


class AssemblyAIListTranscripts(Component):
display_name = "AssemblyAI List Transcripts"
description = "Retrieve a list of transcripts from AssemblyAI with filtering options"
documentation = "https://www.assemblyai.com/docs"
icon = "AssemblyAI"

inputs = [
SecretStrInput(
name="api_key",
display_name="Assembly API Key",
info="Your AssemblyAI API key. You can get one from https://www.assemblyai.com/",
),
IntInput(
name="limit",
display_name="Limit",
info="Maximum number of transcripts to retrieve (default: 20, use 0 for all)",
value=20,
),
DropdownInput(
name="status_filter",
display_name="Status Filter",
options=["all", "queued", "processing", "completed", "error"],
value="all",
info="Filter by transcript status",
),
MessageTextInput(
name="created_on",
display_name="Created On",
info="Only get transcripts created on this date (YYYY-MM-DD)",
),
BoolInput(
name="throttled_only",
display_name="Throttled Only",
info="Only get throttled transcripts, overrides the status filter",
),
]

outputs = [
Output(display_name="Transcript List", name="transcript_list", method="list_transcripts"),
]

def list_transcripts(self) -> List[Data]:
aai.settings.api_key = self.api_key

params = aai.ListTranscriptParameters()
if self.limit:
params.limit = self.limit
if self.status_filter != "all":
params.status = self.status_filter
if self.created_on and self.created_on.text:
params.created_on = self.created_on.text
if self.throttled_only:
params.throttled_only = True

try:
transcriber = aai.Transcriber()

def convert_page_to_data_list(page):
return [Data(**t.dict()) for t in page.transcripts]

if self.limit == 0:
# paginate over all pages
params.limit = 100
page = transcriber.list_transcripts(params)
transcripts = convert_page_to_data_list(page)

while page.page_details.before_id_of_prev_url is not None:
params.before_id = page.page_details.before_id_of_prev_url
page = transcriber.list_transcripts(params)
transcripts.extend(convert_page_to_data_list(page))
else:
# just one page
page = transcriber.list_transcripts(params)
transcripts = convert_page_to_data_list(page)

self.status = transcripts
return transcripts
except Exception as e:
error_data = Data(data={"error": f"An error occurred: {str(e)}"})
self.status = [error_data]
return [error_data]
Loading

0 comments on commit 1ac743f

Please sign in to comment.