Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

12/21/2023 Dask Demo Day notebook #1

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
362 changes: 362 additions & 0 deletions demo.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,362 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "f4cf4340",
"metadata": {},
"source": [
"# Install dependencies\n",
"- `apache-beam[dask]`: core package being demonstrated here\n",
"- `bokeh`: for dask dashboard\n",
"- `mimesis`: required for generating example data\n",
"- `Pygments`: to `cat` example beam script with syntax highlighting"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "47fc8d72-7d1b-4c4a-96c4-2708c98ba5da",
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"# !pip install \"apache-beam[dask]\" \"bokeh!=3.0.*,>=2.4.2\" mimesis Pygments"
]
},
{
"cell_type": "markdown",
"id": "25c02c35",
"metadata": {},
"source": [
"Pinning upper bound of `dask` & `distributed` to `2023.9.2` as a workaround until\n",
"[this fix](https://github.com/apache/beam/pull/27618/files#diff-bfb5ae715e9067778f492058e8a02ff877d6e7584624908ddbdd316853e6befbL102-R107)\n",
"goes in.\n"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "118a3bf9",
"metadata": {},
"outputs": [],
"source": [
"# !pip install -U \"distributed>=2022.6.0,<2023.9.3\""
]
},
{
"cell_type": "markdown",
"id": "b2495371",
"metadata": {},
"source": [
"# Start a client"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "740ade56-6320-495d-ae3e-ebf1c61bb749",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'http://127.0.0.1:8787/status'"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from distributed import Client\n",
"client = Client()\n",
"client.dashboard_link"
]
},
{
"cell_type": "markdown",
"id": "6d551efb",
"metadata": {},
"source": [
"# Example data\n",
"\n",
"Based on https://examples.dask.org/bag.html#Create-Random-Data"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "f2b35bd7-d903-4953-a043-aa4f351938d6",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['/var/folders/tt/4f941hdn0zq549zdwhcgg98c0000gn/T/tmprfxai8wf/0.json',\n",
" '/var/folders/tt/4f941hdn0zq549zdwhcgg98c0000gn/T/tmprfxai8wf/1.json',\n",
" '/var/folders/tt/4f941hdn0zq549zdwhcgg98c0000gn/T/tmprfxai8wf/2.json',\n",
" '/var/folders/tt/4f941hdn0zq549zdwhcgg98c0000gn/T/tmprfxai8wf/3.json',\n",
" '/var/folders/tt/4f941hdn0zq549zdwhcgg98c0000gn/T/tmprfxai8wf/4.json',\n",
" '/var/folders/tt/4f941hdn0zq549zdwhcgg98c0000gn/T/tmprfxai8wf/5.json',\n",
" '/var/folders/tt/4f941hdn0zq549zdwhcgg98c0000gn/T/tmprfxai8wf/6.json',\n",
" '/var/folders/tt/4f941hdn0zq549zdwhcgg98c0000gn/T/tmprfxai8wf/7.json',\n",
" '/var/folders/tt/4f941hdn0zq549zdwhcgg98c0000gn/T/tmprfxai8wf/8.json',\n",
" '/var/folders/tt/4f941hdn0zq549zdwhcgg98c0000gn/T/tmprfxai8wf/9.json']"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import dask\n",
"import json\n",
"import tempfile\n",
"\n",
"td = tempfile.TemporaryDirectory()\n",
"dask.datasets.make_people().map(json.dumps).to_textfiles(f'{td.name}/*.json')"
]
},
{
"cell_type": "markdown",
"id": "076d3543",
"metadata": {},
"source": [
"Note data is in https://jsonlines.org format:"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "652766b9-409f-4d12-998c-8102028b95f6",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{\"age\": 47, \"name\": [\"Deadra\", \"Landry\"], \"occupation\": \"Moneylender\", \"telephone\": \"+1-956-698-8149\", \"address\": {\"address\": \"993 John Muir Creek\", \"city\": \"Murray\"}, \"credit-card\": {\"number\": \"5157 5211 2472 8259\", \"expiration-date\": \"10/23\"}}\n",
"{\"age\": 26, \"name\": [\"Shirleen\", \"Shaw\"], \"occupation\": \"Lawyer\", \"telephone\": \"+16190953288\", \"address\": {\"address\": \"578 Marina Ferry\", \"city\": \"Santee\"}, \"credit-card\": {\"number\": \"4541 9038 5492 9752\", \"expiration-date\": \"12/24\"}}\n"
]
}
],
"source": [
"!head -n 2 {td.name}/0.json"
]
},
{
"cell_type": "markdown",
"id": "615dbb9e",
"metadata": {},
"source": [
"# Dask\n",
"\n",
"Read, load, and filter the data using the Dask Bag API.\n",
"\n",
"This is based on https://examples.dask.org/bag.html#Chain-computations.\n",
"\n",
"It omits aggregation & sampling (`.frequencies`, `.topk`),\n",
"because these operations are not yet replicable for beam-on-dask\n",
"(see **Discussion > Next Steps** below)."
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "fca2d92e",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[('August Bond', 54, 'Chimney Sweep'),\n",
" ('Adan Bell', 41, 'Carpet Fitter'),\n",
" ('Ashley Berry', 32, 'Cabinet Maker')]"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import dask.bag as db\n",
"\n",
"b = (\n",
" db\n",
" .read_text(f'{td.name}/*.json')\n",
" .map(json.loads)\n",
" .filter(lambda record: record['age'] > 30)\n",
" .filter(lambda record: record['name'][0].startswith('A'))\n",
" .filter(lambda record: record['name'][1].startswith('B'))\n",
" .filter(lambda record: record['occupation'].startswith('C'))\n",
" .map(lambda record: (\" \".join(record['name']), record['age'], record['occupation']))\n",
")\n",
"b.compute()"
]
},
{
"cell_type": "markdown",
"id": "b947709b",
"metadata": {},
"source": [
"# Beam\n",
"\n",
"Read, load, and apply the same filters using the Beam API.\n",
"\n",
"Beam's `DaskRunner` doesn't yet support ipython evaluation, so we use a Python script:"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "d161ab64-3861-4c1e-a075-1f90fe6c3e7b",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[34mimport\u001b[39;49;00m \u001b[04m\u001b[36mglob\u001b[39;49;00m\u001b[37m\u001b[39;49;00m\n",
"\u001b[34mimport\u001b[39;49;00m \u001b[04m\u001b[36mjson\u001b[39;49;00m\u001b[37m\u001b[39;49;00m\n",
"\u001b[34mimport\u001b[39;49;00m \u001b[04m\u001b[36msys\u001b[39;49;00m\u001b[37m\u001b[39;49;00m\n",
"\u001b[37m\u001b[39;49;00m\n",
"\u001b[34mimport\u001b[39;49;00m \u001b[04m\u001b[36mapache_beam\u001b[39;49;00m \u001b[34mas\u001b[39;49;00m \u001b[04m\u001b[36mbeam\u001b[39;49;00m\u001b[37m\u001b[39;49;00m\n",
"\u001b[34mfrom\u001b[39;49;00m \u001b[04m\u001b[36mapache_beam\u001b[39;49;00m\u001b[04m\u001b[36m.\u001b[39;49;00m\u001b[04m\u001b[36moptions\u001b[39;49;00m\u001b[04m\u001b[36m.\u001b[39;49;00m\u001b[04m\u001b[36mpipeline_options\u001b[39;49;00m \u001b[34mimport\u001b[39;49;00m PipelineOptions\u001b[37m\u001b[39;49;00m\n",
"\u001b[34mfrom\u001b[39;49;00m \u001b[04m\u001b[36mapache_beam\u001b[39;49;00m\u001b[04m\u001b[36m.\u001b[39;49;00m\u001b[04m\u001b[36mrunners\u001b[39;49;00m\u001b[04m\u001b[36m.\u001b[39;49;00m\u001b[04m\u001b[36mdask\u001b[39;49;00m\u001b[04m\u001b[36m.\u001b[39;49;00m\u001b[04m\u001b[36mdask_runner\u001b[39;49;00m \u001b[34mimport\u001b[39;49;00m DaskRunner\u001b[37m\u001b[39;49;00m\n",
"\u001b[37m\u001b[39;49;00m\n",
"\u001b[37m\u001b[39;49;00m\n",
"\u001b[34mdef\u001b[39;49;00m \u001b[32myield_jsonlines\u001b[39;49;00m(fname: \u001b[36mstr\u001b[39;49;00m):\u001b[37m\u001b[39;49;00m\n",
" \u001b[34mwith\u001b[39;49;00m \u001b[36mopen\u001b[39;49;00m(fname) \u001b[34mas\u001b[39;49;00m f:\u001b[37m\u001b[39;49;00m\n",
" \u001b[34mfor\u001b[39;49;00m line \u001b[35min\u001b[39;49;00m f.readlines():\u001b[37m\u001b[39;49;00m\n",
" \u001b[34myield\u001b[39;49;00m json.loads(line)\u001b[37m\u001b[39;49;00m\n",
" \u001b[37m\u001b[39;49;00m\n",
"\u001b[37m\u001b[39;49;00m\n",
"\u001b[34mif\u001b[39;49;00m \u001b[31m__name__\u001b[39;49;00m == \u001b[33m\"\u001b[39;49;00m\u001b[33m__main__\u001b[39;49;00m\u001b[33m\"\u001b[39;49;00m:\u001b[37m\u001b[39;49;00m\n",
" tmpdir, pipeline_options = sys.argv[\u001b[34m1\u001b[39;49;00m], sys.argv[\u001b[34m2\u001b[39;49;00m:]\u001b[37m\u001b[39;49;00m\n",
" opts = \u001b[36mdict\u001b[39;49;00m(runner=DaskRunner(), options=PipelineOptions(pipeline_options))\u001b[37m\u001b[39;49;00m\n",
" \u001b[34mwith\u001b[39;49;00m beam.Pipeline(**opts) \u001b[34mas\u001b[39;49;00m p:\u001b[37m\u001b[39;49;00m\n",
" (\u001b[37m\u001b[39;49;00m\n",
" p\u001b[37m\u001b[39;49;00m\n",
" | beam.Create(glob.glob(\u001b[33mf\u001b[39;49;00m\u001b[33m'\u001b[39;49;00m\u001b[33m{\u001b[39;49;00mtmpdir\u001b[33m}\u001b[39;49;00m\u001b[33m/*.json\u001b[39;49;00m\u001b[33m'\u001b[39;49;00m))\u001b[37m\u001b[39;49;00m\n",
" | beam.FlatMap(yield_jsonlines)\u001b[37m\u001b[39;49;00m\n",
" | beam.Filter(\u001b[34mlambda\u001b[39;49;00m record: record[\u001b[33m'\u001b[39;49;00m\u001b[33mage\u001b[39;49;00m\u001b[33m'\u001b[39;49;00m] > \u001b[34m30\u001b[39;49;00m)\u001b[37m\u001b[39;49;00m\n",
" | beam.Filter(\u001b[34mlambda\u001b[39;49;00m record: record[\u001b[33m'\u001b[39;49;00m\u001b[33mname\u001b[39;49;00m\u001b[33m'\u001b[39;49;00m][\u001b[34m0\u001b[39;49;00m].startswith(\u001b[33m'\u001b[39;49;00m\u001b[33mA\u001b[39;49;00m\u001b[33m'\u001b[39;49;00m))\u001b[37m\u001b[39;49;00m\n",
" | beam.Filter(\u001b[34mlambda\u001b[39;49;00m record: record[\u001b[33m'\u001b[39;49;00m\u001b[33mname\u001b[39;49;00m\u001b[33m'\u001b[39;49;00m][\u001b[34m1\u001b[39;49;00m].startswith(\u001b[33m'\u001b[39;49;00m\u001b[33mB\u001b[39;49;00m\u001b[33m'\u001b[39;49;00m))\u001b[37m\u001b[39;49;00m\n",
" | beam.Filter(\u001b[34mlambda\u001b[39;49;00m record: record[\u001b[33m'\u001b[39;49;00m\u001b[33moccupation\u001b[39;49;00m\u001b[33m'\u001b[39;49;00m].startswith(\u001b[33m'\u001b[39;49;00m\u001b[33mC\u001b[39;49;00m\u001b[33m'\u001b[39;49;00m))\u001b[37m\u001b[39;49;00m\n",
" | beam.Map(\u001b[34mlambda\u001b[39;49;00m record: (\u001b[33m\"\u001b[39;49;00m\u001b[33m \u001b[39;49;00m\u001b[33m\"\u001b[39;49;00m.join(record[\u001b[33m'\u001b[39;49;00m\u001b[33mname\u001b[39;49;00m\u001b[33m'\u001b[39;49;00m]), record[\u001b[33m'\u001b[39;49;00m\u001b[33mage\u001b[39;49;00m\u001b[33m'\u001b[39;49;00m], record[\u001b[33m'\u001b[39;49;00m\u001b[33moccupation\u001b[39;49;00m\u001b[33m'\u001b[39;49;00m]))\u001b[37m\u001b[39;49;00m\n",
" | beam.Map(\u001b[36mprint\u001b[39;49;00m)\u001b[37m\u001b[39;49;00m\n",
" )\u001b[37m\u001b[39;49;00m\n"
]
}
],
"source": [
"!pygmentize -g example.py"
]
},
{
"cell_type": "markdown",
"id": "aee5e862",
"metadata": {},
"source": [
"And run this computation on the _same Dask cluster_ as we used for the Dask Bag operation:"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "5e067186",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"('August Bond', 54, 'Chimney Sweep')\n",
"('Adan Bell', 41, 'Carpet Fitter')\n",
"('Ashley Berry', 32, 'Cabinet Maker')\n"
]
}
],
"source": [
"!python -W ignore example.py {td.name} --dask_client_address={client.scheduler.address}"
]
},
{
"cell_type": "markdown",
"id": "3592007e",
"metadata": {},
"source": [
"```\n",
"NOTE(cisaacstern): How well does GroupBy work on dask already and should we highlight that in the example?\n",
"```"
]
},
{
"cell_type": "markdown",
"id": "652af54c",
"metadata": {},
"source": [
"# Discussion\n",
"\n",
"## Why\n",
"\n",
"<ul style=\"list-style-type:none;\">\n",
" <li>\n",
" 💡 Beam has a large built-in community, but poor deployment stories for HPC + AWS.\n",
" Dask can fill that gap!\n",
" </li>\n",
" <li>\n",
" 💡 The <a href=\"https://pangeo-forge.readthedocs.io/en/latest/\">Pangeo Forge</a>\n",
" community is motivated by this.\n",
" <a href=\"https://xarray-beam.readthedocs.io/en/latest/\">Xarray-Beam</a>\n",
" is another interesting use case.\n",
" </li>\n",
" <li>\n",
" 💡... more motivation here ...\n",
" </li>\n",
"</ul>\n",
"\n",
"## What works\n",
"\n",
"<ul style=\"list-style-type:none;\">\n",
" <li>✅ Beam pipelines can be compiled to run against various runners, including Dask!</li>\n",
" <li>✅ Existing support for elementwise operations (map, flatmap, filter, etc.)</li>\n",
" <li>✅ ...more existing successes here... </li>\n",
"</ul>\n",
"\n",
"## Next steps\n",
"\n",
"<ul style=\"list-style-type:none;\">\n",
" <li>⭕ Aggregations/reductions (frequencies, folds, etc.) are not implemented, but can be!</li>\n",
" <li>⭕ ... more discussion points here ...</li>\n",
" <li>⭕ ... more discussion points here ...</li>\n",
"</ul>\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "beamdaskdemo",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.6"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
30 changes: 30 additions & 0 deletions example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import glob
import json
import sys

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners.dask.dask_runner import DaskRunner


def yield_jsonlines(fname: str):
with open(fname) as f:
for line in f.readlines():
yield json.loads(line)


if __name__ == "__main__":
tmpdir, pipeline_options = sys.argv[1], sys.argv[2:]
opts = dict(runner=DaskRunner(), options=PipelineOptions(pipeline_options))
with beam.Pipeline(**opts) as p:
(
p
| beam.Create(glob.glob(f'{tmpdir}/*.json'))
| beam.FlatMap(yield_jsonlines)
| beam.Filter(lambda record: record['age'] > 30)
| beam.Filter(lambda record: record['name'][0].startswith('A'))
| beam.Filter(lambda record: record['name'][1].startswith('B'))
| beam.Filter(lambda record: record['occupation'].startswith('C'))
| beam.Map(lambda record: (" ".join(record['name']), record['age'], record['occupation']))
| beam.Map(print)
)