Skip to content

Commit

Permalink
enhance(apps/analytics): add scripts for the computation of aggregate…
Browse files Browse the repository at this point in the history
…d analytics (#4385)
  • Loading branch information
sjschlapbach authored Dec 4, 2024
1 parent 4ca93c7 commit 76c2300
Show file tree
Hide file tree
Showing 7 changed files with 367 additions and 0 deletions.
1 change: 1 addition & 0 deletions apps/analytics/src/modules/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .participant_analytics import compute_correctness, get_participant_responses
from .aggregated_analytics import compute_aggregated_analytics
4 changes: 4 additions & 0 deletions apps/analytics/src/modules/aggregated_analytics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .compute_aggregated_analytics import compute_aggregated_analytics
from .load_participant_analytics import load_participant_analytics
from .aggregate_participant_analytics import aggregate_participant_analytics
from .save_aggregated_analytics import save_aggregated_analytics
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
def aggregate_participant_analytics(df_participant_analytics, verbose=False):
# if the dataframe is empty, return None
if df_participant_analytics.empty:
if verbose:
print("No participant analytics to aggregate")

return None

# aggreagte all participant analytics for the specified time range and separate courses
df_aggregated_analytics = (
df_participant_analytics.groupby("courseId")
.agg(
{
"id": "count",
"responseCount": "sum",
"totalScore": "sum",
"totalPoints": "sum",
"totalXp": "sum",
}
)
.reset_index()
.rename(
columns={
"id": "participantCount",
}
)
)

return df_aggregated_analytics
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from .load_participant_analytics import load_participant_analytics
from .aggregate_participant_analytics import aggregate_participant_analytics
from .save_aggregated_analytics import save_aggregated_analytics


def compute_aggregated_analytics(
db, start_date, end_date, timestamp, analytics_type="DAILY", verbose=False
):
# load all participant analytics for the given timestamp and analytics time range
df_participant_analytics = load_participant_analytics(
db, timestamp, analytics_type, verbose
)

# aggregate all participant analytics values by course
df_aggregated_analytics = aggregate_participant_analytics(
df_participant_analytics, verbose
)

if df_aggregated_analytics is not None and verbose:
print("Aggregated analytics for time range:" + start_date + " to " + end_date)
print(df_aggregated_analytics.head())
elif df_aggregated_analytics is None:
print(
"No aggregated analytics to compute for time range:"
+ start_date
+ " to "
+ end_date
)

# store the computed aggregated analytics in the database
if df_aggregated_analytics is not None:
save_aggregated_analytics(
db, df_aggregated_analytics, timestamp, analytics_type
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import pandas as pd


def convert_to_df(analytics):
# convert the database query result into a pandas dataframe
rows = []
for item in analytics:
rows.append(dict(item))

return pd.DataFrame(rows)


def load_participant_analytics(db, timestamp, analytics_type, verbose=False):
participant_analytics = db.participantanalytics.find_many(
where={"timestamp": timestamp, "type": analytics_type},
)

if verbose:
# Print the first participant analytics
print(
"Found {} analytics for the timespan from {} to {}".format(
len(participant_analytics), start_date, end_date
)
)
print(participant_analytics[0])

# convert the analytics to a dataframe
df_loaded_analytics = convert_to_df(participant_analytics)

return df_loaded_analytics
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
from datetime import datetime


def save_aggregated_analytics(db, df_analytics, timestamp, analytics_type="DAILY"):
computedAt = datetime.now().strftime("%Y-%m-%d") + "T00:00:00.000Z"

# create daily / weekly / monthly analytics entries for all participants
if analytics_type in ["DAILY", "WEEKLY", "MONTHLY"]:
for _, row in df_analytics.iterrows():
db.aggregatedanalytics.upsert(
where={
"type_courseId_timestamp": {
"type": analytics_type,
"courseId": row["courseId"],
"timestamp": timestamp,
}
},
data={
"create": {
"type": analytics_type,
"timestamp": timestamp,
"computedAt": computedAt,
"participantCount": row["participantCount"],
"responseCount": row["responseCount"],
"totalScore": row["totalScore"],
"totalPoints": row["totalPoints"],
"totalXp": row["totalXp"],
# TODO: set this value correctly for rolling updates in production code
# (cannot be computed for past learning analytics -> therefore set to invalid value)
"totalElementsAvailable": -1,
"course": {"connect": {"id": row["courseId"]}},
},
"update": {},
},
)

# create or update course-wide analytics entries (should be unique for participant / course combination)
elif analytics_type == "COURSE":
for _, row in df_analytics.iterrows():
course = db.course.find_unique_or_raise(
where={"id": row["courseId"]},
include={
"practiceQuizzes": {
"include": {
"stacks": {
"include": {"elements": True},
}
}
},
"microLearnings": {
"include": {
"stacks": {
"include": {"elements": True},
}
}
},
},
)
course = dict(course)

# add all the number of elements in all practice quizzes and microlearnings together
totalElementsAvailable = 0
for practice_quiz in course["practiceQuizzes"]:
pq_dict = dict(practice_quiz)
for stack in pq_dict["stacks"]:
stack_dict = dict(stack)
totalElementsAvailable += len(stack_dict["elements"])
for microlearning in course["microLearnings"]:
ml_dict = dict(microlearning)
for stack in ml_dict["stacks"]:
stack_dict = dict(stack)
totalElementsAvailable += len(stack_dict["elements"])

db.aggregatedanalytics.upsert(
where={
"type_courseId_timestamp": {
"type": analytics_type,
"courseId": row["courseId"],
"timestamp": timestamp,
}
},
data={
"create": {
"type": analytics_type,
"timestamp": timestamp,
"computedAt": computedAt,
"participantCount": row["participantCount"],
"responseCount": row["responseCount"],
"totalScore": row["totalScore"],
"totalPoints": row["totalPoints"],
"totalXp": row["totalXp"],
"totalElementsAvailable": totalElementsAvailable,
"course": {"connect": {"id": row["courseId"]}},
},
"update": {
"computedAt": computedAt,
"participantCount": row["participantCount"],
"responseCount": row["responseCount"],
"totalScore": row["totalScore"],
"totalPoints": row["totalPoints"],
"totalXp": row["totalXp"],
"totalElementsAvailable": totalElementsAvailable,
},
},
)

else:
raise ValueError("Unknown analytics type: {}".format(analytics_type))
161 changes: 161 additions & 0 deletions apps/analytics/src/notebooks/aggregated_analytics.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Preparation\n",
"\n",
"This script computes analytics that are aggregated over all participants in a course over a specified time span. The corresponding results are stored in the `AggregatedAnalytics` database table. While the daily, weekly and monthly scripts are designed to run at the end of the corresponding period (only creation, no updates), the course analytics are once more meant to be updated on a regular basis (daily?). Minor changes to the calling logic of these functions will be required for continuous updates."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import json\n",
"from datetime import datetime\n",
"from prisma import Prisma\n",
"import pandas as pd\n",
"\n",
"# set the python path correctly for module imports to work\n",
"import sys\n",
"sys.path.append('../../')\n",
"\n",
"from src.modules.aggregated_analytics.compute_aggregated_analytics import compute_aggregated_analytics"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"db = Prisma()\n",
"\n",
"# set the environment variable DATABASE_URL to the connection string of your database\n",
"os.environ['DATABASE_URL'] = 'postgresql://klicker:klicker@localhost:5432/klicker-prod'\n",
"\n",
"db.connect()\n",
"\n",
"# Script settings\n",
"verbose = False\n",
"\n",
"# Settings which analytics to compute\n",
"compute_daily = True\n",
"compute_weekly = True\n",
"compute_monthly = True\n",
"compute_course = True"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Compute Aggregated Analytics on Course Level\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"start_date = \"2021-01-01\"\n",
"end_date = datetime.now().strftime(\"%Y-%m-%d\")\n",
"date_range_daily = pd.date_range(start=start_date, end=end_date, freq=\"D\")\n",
"date_range_weekly = pd.date_range(start=start_date, end=end_date, freq=\"W\")\n",
"date_range_monthly = pd.date_range(start=start_date, end=end_date, freq=\"ME\")\n",
"\n",
"if compute_daily:\n",
" # Iterate over the date range and compute the participant analytics for each day\n",
" for curr_date in date_range_daily:\n",
" # determine day start and end dates required for aggregation\n",
" specific_date = curr_date.strftime(\"%Y-%m-%d\")\n",
" day_start = specific_date + \"T00:00:00.000Z\"\n",
" day_end = specific_date + \"T23:59:59.999Z\"\n",
" print(f\"Computing daily aggregated analytics (course) for {specific_date}\")\n",
"\n",
" # compute aggregated analytics for a specific day\n",
" timestamp = day_start\n",
" compute_aggregated_analytics(\n",
" db, day_start, day_end, timestamp, \"DAILY\", verbose\n",
" )\n",
"\n",
"\n",
"if compute_weekly:\n",
" # Iterate over the date range and compute the participant analytics for each week\n",
" for curr_date in date_range_weekly:\n",
" # determine week start and end dates required for aggregation\n",
" week_end = curr_date.strftime(\"%Y-%m-%d\") + \"T23:59:59.999Z\"\n",
" week_start = (curr_date - pd.DateOffset(days=6)).strftime(\n",
" \"%Y-%m-%d\"\n",
" ) + \"T00:00:00.000Z\"\n",
" print(\n",
" f\"Computing weekly aggregated analytics (course) for {week_start } to {week_end }\"\n",
" )\n",
"\n",
" # compute aggregated analytics for a specific week\n",
" timestamp = week_end\n",
" compute_aggregated_analytics(\n",
" db, week_start, week_end, timestamp, \"WEEKLY\", verbose\n",
" )\n",
"\n",
"\n",
"if compute_monthly:\n",
" # Iterate over the date range and compute the participant analytics for each month\n",
" for curr_date in date_range_monthly:\n",
" # determine month start and end dates required for aggregation\n",
" month_end = curr_date.strftime('%Y-%m-%d') + 'T23:59:59.999Z'\n",
" month_start = (curr_date - pd.offsets.MonthBegin(1)).strftime('%Y-%m-%d') + 'T00:00:00.000Z'\n",
" print(\n",
" f\"Computing monthly aggregated analytics (course) for {month_start } to {month_end }\"\n",
" )\n",
"\n",
" # compute aggregated analytics for a specific month\n",
" timestamp = month_end\n",
" compute_aggregated_analytics(\n",
" db, month_start, month_end, timestamp, \"MONTHLY\", verbose\n",
" )\n",
"\n",
"\n",
"if compute_course:\n",
" print(\n",
" f\"Computing course-wide aggregated analytics\"\n",
" )\n",
"\n",
" # compute aggregated analytics over entire course based on corresponding participant analytics\n",
" # (a constant timestamp is used here, since the data combination has to be unique \n",
" # during querying, but only one entry per course is available by definition)\n",
" timestamp = \"1970-01-01T00:00:00.000Z\"\n",
" compute_aggregated_analytics(\n",
" db, timestamp, timestamp, timestamp, \"COURSE\", verbose\n",
" )"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "analytics-fkWWeYLw-py3.12",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

0 comments on commit 76c2300

Please sign in to comment.