Skip to content

Commit

Permalink
Update top_wikipedia_sessions to be more idiomatic with beam.Map. (#3…
Browse files Browse the repository at this point in the history
…2041)

* Update top_wikipedia_sessions to be more idiomatic with beam.Map.

* lint
  • Loading branch information
robertwb authored Aug 4, 2024
1 parent bf42a81 commit 7e75087
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 20 deletions.
34 changes: 14 additions & 20 deletions sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,13 @@
MAX_TIMESTAMP = 0x7fffffffffffffff


class ExtractUserAndTimestampDoFn(beam.DoFn):
def extract_user_and_timestamp(element):
"""Extracts user and timestamp representing a Wikipedia edit."""
def process(self, element):
table_row = json.loads(element)
if 'contributor_username' in table_row:
user_name = table_row['contributor_username']
timestamp = table_row['timestamp']
yield TimestampedValue(user_name, timestamp)
table_row = json.loads(element)
if 'contributor_username' in table_row:
user_name = table_row['contributor_username']
timestamp = table_row['timestamp']
return TimestampedValue(user_name, timestamp)


class ComputeSessions(beam.PTransform):
Expand Down Expand Up @@ -98,19 +97,15 @@ def expand(self, pcoll):
without_defaults())


class SessionsToStringsDoFn(beam.DoFn):
def sessions_to_strings(element, window=beam.DoFn.WindowParam):
"""Adds the session information to be part of the key."""
def process(self, element, window=beam.DoFn.WindowParam):
yield (element[0] + ' : ' + str(window), element[1])
return (element[0] + ' : ' + str(window), element[1])


class FormatOutputDoFn(beam.DoFn):
def format_output(element, window=beam.DoFn.WindowParam):
"""Formats a string containing the user, count, and session."""
def process(self, element, window=beam.DoFn.WindowParam):
for kv in element:
session = kv[0]
count = kv[1]
yield session + ' : ' + str(count) + ' : ' + str(window)
for session, count in element:
yield session + ' : ' + str(count) + ' : ' + str(window)


class ComputeTopSessions(beam.PTransform):
Expand All @@ -124,14 +119,13 @@ def __init__(self, sampling_threshold):
def expand(self, pcoll):
return (
pcoll
|
'ExtractUserAndTimestamp' >> beam.ParDo(ExtractUserAndTimestampDoFn())
| 'ExtractUserAndTimestamp' >> beam.Map(extract_user_and_timestamp)
| beam.Filter(
lambda x: (abs(hash(x)) <= MAX_TIMESTAMP * self.sampling_threshold))
| ComputeSessions()
| 'SessionsToStrings' >> beam.ParDo(SessionsToStringsDoFn())
| 'SessionsToStrings' >> beam.Map(sessions_to_strings)
| TopPerMonth()
| 'FormatOutput' >> beam.ParDo(FormatOutputDoFn()))
| 'FormatOutput' >> beam.FlatMap(format_output))


def run(argv=None):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to

# TODO: Unit test top_wikipedia_sessions.extract_user_and_timestamp, etc.


class ComputeTopSessionsTest(unittest.TestCase):

Expand Down

0 comments on commit 7e75087

Please sign in to comment.