Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/openbb apachebeam #6679

Merged
Merged
12 changes: 12 additions & 0 deletions examples/openbb-apachebeam/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# OBB Dataflow Sample


This is a sample how to invoke OBB fetchers in an Apache Beam pipeline. (GCP dataflow is build on Apache Beam)
Pre-requisites
- You need to create a Conda environment (or a virtual env) using requirements.txt in this directory
- The script exercise 3 OBB endpoints, all of which require no credentials
- Run the test from the main directory
python -m unittest .\tests\test_obb_pipeline.py

The script will run a pipeline consisting of 3 task which will fetch AAPL quote, profile and news
This is just a very basic sample which can be used as building block to create more complex scenarios
2 changes: 2 additions & 0 deletions examples/openbb-apachebeam/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
apache-beam
openbb-yfinance
Empty file.
48 changes: 48 additions & 0 deletions examples/openbb-apachebeam/tests/test_obb_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import unittest
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.options.pipeline_options import PipelineOptions
import asyncio
import apache_beam as beam
from openbb_yfinance.models.equity_quote import YFinanceEquityQuoteFetcher as quote_fetcher
from openbb_yfinance.models.equity_profile import YFinanceEquityProfileFetcher as profile_fetcher
from openbb_yfinance.models.company_news import YFinanceCompanyNewsFetcher as news_fetcher


class AsyncProcess(beam.DoFn):

def __init__(self, credentials, fetcher):
self.credentials = credentials
self.fetcher = fetcher

async def fetch_data(self, element: str):
params = dict(symbol=element)
data = await self.fetcher.fetch_data(params, self.credentials)
return [d.model_dump(exclude_none=True) for d in data]

def process(self, element: str):
return asyncio.run(self.fetch_data(element))

class MyTestCase(unittest.TestCase):


def test_sample_pipeline(self):
credentials = {} # Running OBB endpoints which do not require credentials
debug_sink = beam.Map(print)
ticker = 'AAPL'

with TestPipeline(options=PipelineOptions()) as p:
quote = (p | 'Start Quote' >> beam.Create([ticker])
| 'Run Quote' >> beam.ParDo(AsyncProcess(credentials, quote_fetcher))
| 'Print quote' >> debug_sink)

profile = (p | 'Start Profile' >> beam.Create([ticker])
| 'Run Profile' >> beam.ParDo(AsyncProcess(credentials, profile_fetcher))
| 'Print profile' >> debug_sink)

news = (p | 'Start News' >> beam.Create([ticker])
| 'Run News' >> beam.ParDo(AsyncProcess(credentials, news_fetcher))
| 'Print nes' >> debug_sink)


if __name__ == '__main__':
unittest.main()
Loading