Skip to content

Commit

Permalink
[spark] Add whisper python code
Browse files Browse the repository at this point in the history
  • Loading branch information
xyang16 committed Apr 7, 2023
1 parent f2ad4bd commit 04896cb
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 0 deletions.
21 changes: 21 additions & 0 deletions extensions/spark/setup/djl_spark/task/audio/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/usr/bin/env python
#
# Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file
# except in compliance with the License. A copy of the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "LICENSE.txt" file accompanying this file. This file is distributed on an "AS IS"
# BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express or implied. See the License for
# the specific language governing permissions and limitations under the License.

"""DJL Spark Tasks Text API."""

from . import whisper_speech_recognizer

WhisperSpeechRecognizer = whisper_speech_recognizer.WhisperSpeechRecognizer

# Remove unnecessary modules to avoid duplication in API.
del whisper_speech_recognizer
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#!/usr/bin/env python
#
# Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file
# except in compliance with the License. A copy of the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "LICENSE.txt" file accompanying this file. This file is distributed on an "AS IS"
# BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express or implied. See the License for
# the specific language governing permissions and limitations under the License.

from pyspark import SparkContext
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType
import io
import librosa
import pandas as pd
from typing import Iterator
from transformers import pipeline


class WhisperSpeechRecognizer:

def __init__(self, input_col, output_col, engine, model_url=None, model_name=None):
"""
Initializes the WhisperSpeechRecognizer.
:param input_col: The input column
:param output_col: The output column
:param engine: The engine. Currently only PyTorch is supported.
:param model_url: The model URL
:param model_name: The model name
"""
self.input_col = input_col
self.output_col = output_col
self.engine = engine
self.model_url = model_url
self.model_name = model_name

def recognize(self, dataset, **kwargs):
"""
Performs speech recognition on the provided dataset.
:param dataset: input dataset
:return: output dataset
"""
sc = SparkContext._active_spark_context
if not self.model_url and not self.model_name:
raise ValueError("Either model_url or model_name must be provided.")
model_name_or_url = self.model_url if self.model_url else self.model_name
pipe = pipeline("automatic-speech-recognition", model=model_name_or_url, chunk_length_s=30, **kwargs)
bc_pipe = sc.broadcast(pipe)

@pandas_udf(StringType())
def predict_udf(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
batch = []
for s in iterator:
for d in s:
# Model expects single channel, 16000 sample rate audio
data, sample_rate = librosa.load(io.BytesIO(d), mono=True, sr=16000)
batch.append(data)
output = bc_pipe.value(batch)
text = [o["text"] for o in output]
yield pd.Series(text)

return dataset.withColumn(self.output_col, predict_udf(self.input_col))

0 comments on commit 04896cb

Please sign in to comment.