Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: write recording summary events #15245

Merged
merged 46 commits into from
May 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
a65131b
add transformer
pauldambra Apr 25, 2023
0302f4e
configurably write summaries to a new kafka topic
pauldambra Apr 25, 2023
81095e9
add tables to receive summary events
pauldambra Apr 25, 2023
e212b50
comment was already out of date
pauldambra Apr 25, 2023
c1a7064
first pass over comments
pauldambra Apr 26, 2023
5ee61aa
now with aggregation that fails tests
pauldambra Apr 26, 2023
d468b04
Merge branch 'master' into feat/write-recording-summary
pauldambra Apr 26, 2023
906b0c4
working as an aggregating table
pauldambra Apr 26, 2023
bb0fb16
fixes
pauldambra Apr 26, 2023
45a7fa9
make all the pip installs the same
pauldambra Apr 26, 2023
84f12ea
Change this file to try and prompt prettier in CI that it's ok
pauldambra Apr 26, 2023
beff2fb
Merge branch 'master' into feat/write-recording-summary
pauldambra Apr 27, 2023
f3e0b94
explicit first and last timestamp from event summaries
pauldambra Apr 27, 2023
bfaf0a2
Merge branch 'master' into feat/write-recording-summary
pauldambra Apr 27, 2023
60ced21
but not also broken
pauldambra Apr 27, 2023
ceac97c
even more less broken sql
pauldambra Apr 27, 2023
d067d51
Merge branch 'master' into feat/write-recording-summary
pauldambra Apr 27, 2023
a8ef254
correct test setup
pauldambra Apr 27, 2023
32d19ca
include duration when querying summary
pauldambra Apr 27, 2023
a49b91f
Merge branch 'master' into feat/write-recording-summary
pauldambra May 2, 2023
020fca2
Merge branch 'master' into feat/write-recording-summary
pauldambra May 3, 2023
9d77f79
Merge branch 'master' into feat/write-recording-summary
pauldambra May 3, 2023
476d33a
switch to simple aggregate so we can partition by time
pauldambra May 3, 2023
5dfdf15
Update query snapshots
github-actions[bot] May 3, 2023
5cab032
Merge branch 'master' into feat/write-recording-summary
pauldambra May 3, 2023
94f64e0
feat: with activity milliseconds (#15366)
pauldambra May 4, 2023
54b36ce
Merge branch 'master' into feat/write-recording-summary
pauldambra May 4, 2023
18f9ab6
don't release to all, env variables will override
pauldambra May 4, 2023
f05ccec
no need to find gaps
pauldambra May 4, 2023
7d78183
Merge branch 'master' into feat/write-recording-summary
pauldambra May 4, 2023
e7e08d7
obey mypy
pauldambra May 4, 2023
bef878e
Merge branch 'master' into feat/write-recording-summary
pauldambra May 4, 2023
dde2c76
Merge branch 'master' into feat/write-recording-summary
pauldambra May 4, 2023
bda15ae
Merge branch 'master' into feat/write-recording-summary
pauldambra May 4, 2023
b657765
wrap the new code in a try catch
pauldambra May 5, 2023
2e7786e
a little more softly softly catchy monkey
pauldambra May 5, 2023
922734e
need to more careful about first url
pauldambra May 8, 2023
495e4f7
with merge state on first url
pauldambra May 8, 2023
eaf5852
more commenting
pauldambra May 8, 2023
7869a6f
Update query snapshots
github-actions[bot] May 8, 2023
db76ca5
Merge branch 'master' into feat/write-recording-summary
pauldambra May 8, 2023
81ba357
fix
pauldambra May 8, 2023
ac6578d
doh
pauldambra May 8, 2023
3473dac
Update posthog/clickhouse/client/execute.py
pauldambra May 8, 2023
1bff3e2
fix
pauldambra May 8, 2023
80e45eb
order by day
pauldambra May 9, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 5 additions & 11 deletions .github/workflows/ci-backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ jobs:
if: steps.cache-backend-tests.outputs.cache-hit != 'true'
run: |
cd current
python -m pip install -r requirements-dev.txt
python -m pip install -r requirements.txt
python -m pip install -r requirements.txt -r requirements-dev.txt

- name: Check for syntax errors, import sort, and code style violations
run: |
Expand Down Expand Up @@ -199,8 +198,7 @@ jobs:
- name: Install python dependencies
if: steps.cache-backend-tests.outputs.cache-hit != 'true'
run: |
python -m pip install -r requirements-dev.txt
python -m pip install -r requirements.txt
python -m pip install -r requirements.txt -r requirements-dev.txt

- uses: actions/checkout@v3
with:
Expand All @@ -210,8 +208,7 @@ jobs:
run: |
# We need to ensure we have requirements for the master branch
# now also, so we can run migrations up to master.
python -m pip install -r requirements-dev.txt
python -m pip install -r requirements.txt
python -m pip install -r requirements.txt -r requirements-dev.txt
python manage.py migrate

- uses: actions/checkout@v3
Expand Down Expand Up @@ -356,8 +353,7 @@ jobs:
- name: Install python dependencies
if: steps.cache-backend-tests.outputs.cache-hit != 'true'
run: |
python -m pip install -r master/requirements-dev.txt
python -m pip install -r master/requirements.txt
python -m pip install -r master/requirements.txt -r master/requirements-dev.txt

- name: Wait for Clickhouse & Kafka
run: master/bin/check_kafka_clickhouse_up
Expand All @@ -376,9 +372,7 @@ jobs:
- name: Install requirements.txt dependencies with pip at current branch
run: |
cd current
python -m pip install --upgrade pip
python -m pip install -r requirements.txt
python -m pip install freezegun fakeredis pytest pytest-mock pytest-django syrupy
python -m pip install -r requirements.txt -r requirements-dev.txt

- name: Link posthog-cloud at current branch
run: |
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pr-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ jobs:
export RELEASE_NAME=posthog
export NAMESPACE=pr-$PR_NUM-${BRANCH_NAME//\//-}
export NAMESPACE=${NAMESPACE:0:38}
export NAMESPACE=${NAMESPACE%%-}
export HOSTNAME=$NAMESPACE
export TAILNET_NAME=hedgehog-kitefin
export TS_AUTHKEY=${{ secrets.TAILSCALE_SERVICE_AUTHKEY }}
Expand Down
3 changes: 2 additions & 1 deletion .run/Plugin Server.run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
<env name="OBJECT_STORAGE_ENABLED" value="True" />
<env name="WORKER_CONCURRENCY" value="2" />
<env name="SESSION_RECORDING_BLOB_PROCESSING_TEAMS" value="all" />
<env name="SESSION_RECORDING_SUMMARY_INGESTION_ENABLED_TEAMS" value="all" />
</envs>
<method v="2" />
</configuration>
</component>
</component>
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@ import { EventType, IncrementalSource, eventWithTime } from '@rrweb/types'
import { Dayjs } from 'lib/dayjs'
import { RecordingSegment, RecordingSnapshot } from '~/types'

/**
* This file is copied into the plugin server to calculate activeMilliseconds on ingestion
* plugin-server/src/main/ingestion-queues/session-recording/snapshot-segmenter.ts
*
* Changes here should be reflected there
* TODO add code sharing between plugin-server and front-end so that this duplication is unnecessary
*/

const activeSources = [
IncrementalSource.MouseMove,
IncrementalSource.MouseInteraction,
Expand Down
4 changes: 3 additions & 1 deletion plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,15 @@ export function getDefaultConfig(): PluginsServerConfig {
USE_KAFKA_FOR_SCHEDULED_TASKS: true,
CLOUD_DEPLOYMENT: 'default', // Used as a Sentry tag

SESSION_RECORDING_BLOB_PROCESSING_TEAMS: '', // TODO: BW Change this to 'all' when we release it fully
SESSION_RECORDING_BLOB_PROCESSING_TEAMS: '', // TODO: Change this to 'all' when we release it fully
SESSION_RECORDING_LOCAL_DIRECTORY: '.tmp/sessions',
SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS: 60 * 10, // NOTE: 10 minutes
SESSION_RECORDING_MAX_BUFFER_SIZE_KB: ['dev', 'test'].includes(process.env.NODE_ENV || 'undefined')
? 1024 // NOTE: ~1MB in dev or test, so that even with gzipped content we still flush pretty frequently
: 1024 * 50, // ~50MB after compression in prod
SESSION_RECORDING_REMOTE_FOLDER: 'session_recordings',

SESSION_RECORDING_SUMMARY_INGESTION_ENABLED_TEAMS: '', // TODO: Change this to 'all' when we release it fully
}
}

Expand Down
1 change: 1 addition & 0 deletions plugin-server/src/config/kafka-topics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export const KAFKA_PERSON = `${prefix}clickhouse_person${suffix}`
export const KAFKA_PERSON_UNIQUE_ID = `${prefix}clickhouse_person_unique_id${suffix}`
export const KAFKA_PERSON_DISTINCT_ID = `${prefix}clickhouse_person_distinct_id${suffix}`
export const KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS = `${prefix}clickhouse_session_recording_events${suffix}`
export const KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS = `${prefix}clickhouse_session_replay_events${suffix}`
pauldambra marked this conversation as resolved.
Show resolved Hide resolved
export const KAFKA_PERFORMANCE_EVENTS = `${prefix}clickhouse_performance_events${suffix}`
export const KAFKA_EVENTS_PLUGIN_INGESTION = `${prefix}events_plugin_ingestion${suffix}`
export const KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW = `${prefix}events_plugin_ingestion_overflow${suffix}`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ import { instrumentEachBatch, setupEventHandlers } from './kafka-queue'
import { latestOffsetTimestampGauge } from './metrics'

// The valid task types that can be scheduled.
// TODO: not sure if there is another place that defines these but it would be
// good to unify.
// TODO: not sure if there is another place that defines these but it would be good to unify.
const taskTypes = ['runEveryMinute', 'runEveryHour', 'runEveryDay'] as const

export const startScheduledTasksConsumer = async ({
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { PluginEvent } from '@posthog/plugin-scaffold'
import { captureException } from '@sentry/node'
import { HighLevelProducer as RdKafkaProducer, Message, NumberNullUndefined } from 'node-rdkafka-acosom'

import {
KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS,
KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS,
KAFKA_PERFORMANCE_EVENTS,
KAFKA_SESSION_RECORDING_EVENTS,
KAFKA_SESSION_RECORDING_EVENTS_DLQ,
Expand All @@ -14,7 +16,12 @@ import { createKafkaProducer, disconnectProducer, flushProducer, produce } from
import { PipelineEvent, RawEventMessage, Team } from '../../../types'
import { KafkaConfig } from '../../../utils/db/hub'
import { status } from '../../../utils/status'
import { createPerformanceEvent, createSessionRecordingEvent } from '../../../worker/ingestion/process-event'
import {
createPerformanceEvent,
createSessionRecordingEvent,
createSessionReplayEvent,
SummarizedSessionRecordingEvent,
} from '../../../worker/ingestion/process-event'
import { TeamManager } from '../../../worker/ingestion/team-manager'
import { parseEventTimestamp } from '../../../worker/ingestion/timestamps'
import { eventDroppedCounter } from '../metrics'
Expand All @@ -25,12 +32,14 @@ export const startSessionRecordingEventsConsumer = async ({
consumerMaxBytes,
consumerMaxBytesPerPartition,
consumerMaxWaitMs,
summaryIngestionEnabledTeams,
}: {
teamManager: TeamManager
kafkaConfig: KafkaConfig
consumerMaxBytes: number
consumerMaxBytesPerPartition: number
consumerMaxWaitMs: number
summaryIngestionEnabledTeams: string
}) => {
/*
For Session Recordings we need to prepare the data for ClickHouse.
Expand Down Expand Up @@ -59,7 +68,13 @@ export const startSessionRecordingEventsConsumer = async ({

const connectionConfig = createRdConnectionConfigFromEnvVars(kafkaConfig)
const producer = await createKafkaProducer(connectionConfig)
const eachBatchWithContext = eachBatch({ teamManager, producer })

const eachBatchWithContext = eachBatch({
teamManager,
producer,
summaryEnabledTeams:
summaryIngestionEnabledTeams === 'all' ? null : summaryIngestionEnabledTeams.split(',').map(parseInt),
})

// Create a node-rdkafka consumer that fetches batches of messages, runs
// eachBatchWithContext, then commits offsets for the batch.
Expand All @@ -84,7 +99,15 @@ export const startSessionRecordingEventsConsumer = async ({
}

export const eachBatch =
({ teamManager, producer }: { teamManager: TeamManager; producer: RdKafkaProducer }) =>
({
teamManager,
producer,
summaryEnabledTeams,
}: {
teamManager: TeamManager
producer: RdKafkaProducer
summaryEnabledTeams: number[] | null
}) =>
async (messages: Message[]) => {
// To start with, we simply process each message in turn,
// without attempting to perform any concurrency. There is a lot
Expand All @@ -105,7 +128,7 @@ export const eachBatch =
// DependencyUnavailableError error to distinguish between
// intermittent and permanent errors.
const pendingProduceRequests: Promise<NumberNullUndefined>[] = []
const eachMessageWithContext = eachMessage({ teamManager, producer })
const eachMessageWithContext = eachMessage({ teamManager, producer, summaryEnabledTeams })

for (const message of messages) {
const results = await retryOnDependencyUnavailableError(() => eachMessageWithContext(message))
Expand Down Expand Up @@ -148,7 +171,15 @@ export const eachBatch =
}

const eachMessage =
({ teamManager, producer }: { teamManager: TeamManager; producer: RdKafkaProducer }) =>
({
teamManager,
producer,
summaryEnabledTeams,
}: {
teamManager: TeamManager
producer: RdKafkaProducer
summaryEnabledTeams: number[] | null
}) =>
async (message: Message) => {
// For each message, we:
//
Expand Down Expand Up @@ -252,14 +283,42 @@ const eachMessage =
event.properties || {}
)

return [
let replayRecord: null | SummarizedSessionRecordingEvent = null
try {
if (summaryEnabledTeams === null || summaryEnabledTeams?.includes(team.id)) {
replayRecord = createSessionReplayEvent(
messagePayload.uuid,
team.id,
messagePayload.distinct_id,
event.ip,
event.properties || {}
)
}
} catch (e) {
status.warn('??', 'session_replay_summarizer_error', { error: e })
captureException(e)
}

const producePromises = [
produce({
producer,
topic: KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS,
value: Buffer.from(JSON.stringify(clickHouseRecord)),
key: message.key ? Buffer.from(message.key) : null,
}),
]

if (replayRecord) {
producePromises.push(
produce({
producer,
topic: KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS,
value: Buffer.from(JSON.stringify(replayRecord)),
key: message.key ? Buffer.from(message.key) : null,
})
)
}
return producePromises
} else if (event.event === '$performance_event') {
const clickHouseRecord = createPerformanceEvent(
messagePayload.uuid,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/**
* This file is a cut-down version of the segmenter.ts
* https://github.com/PostHog/posthog/blob/db2deaf650d2eca9addba5e3f304a17a21041f25/frontend/src/scenes/session-recordings/player/utils/segmenter.ts
*
* It has been modified to not need the same dependencies
* Any changes may need to be sync'd between the two
*/

const activeSources = [1, 2, 3, 4, 5, 6, 7, 12]

const ACTIVITY_THRESHOLD_MS = 5000

export interface RRWebEventSummaryData {
href?: string
source?: number
payload?: Record<string, any>
}

export interface RRWebEventSummary {
timestamp: number
type: number
data: RRWebEventSummaryData
windowId: string
}

interface RecordingSegment {
kind: 'window' | 'buffer' | 'gap'
startTimestamp: number // Epoch time that the segment starts
endTimestamp: number // Epoch time that the segment ends
durationMs: number
windowId?: string
isActive: boolean
}

const isActiveEvent = (event: RRWebEventSummary): boolean => {
return event.type === 3 && activeSources.includes(event.data?.source || -1)
}

const createSegments = (snapshots: RRWebEventSummary[]): RecordingSegment[] => {
let segments: RecordingSegment[] = []
let activeSegment!: Partial<RecordingSegment>
let lastActiveEventTimestamp = 0

snapshots.forEach((snapshot) => {
const eventIsActive = isActiveEvent(snapshot)
lastActiveEventTimestamp = eventIsActive ? snapshot.timestamp : lastActiveEventTimestamp

// When do we create a new segment?
// 1. If we don't have one yet
let isNewSegment = !activeSegment

// 2. If it is currently inactive but a new "active" event comes in
if (eventIsActive && !activeSegment?.isActive) {
isNewSegment = true
}

// 3. If it is currently active but no new active event has been seen for the activity threshold
if (activeSegment?.isActive && lastActiveEventTimestamp + ACTIVITY_THRESHOLD_MS < snapshot.timestamp) {
isNewSegment = true
}

// 4. If windowId changes we create a new segment
if (activeSegment?.windowId !== snapshot.windowId) {
isNewSegment = true
}

if (isNewSegment) {
if (activeSegment) {
segments.push(activeSegment as RecordingSegment)
}

activeSegment = {
kind: 'window',
startTimestamp: snapshot.timestamp,
windowId: snapshot.windowId,
isActive: eventIsActive,
}
}

activeSegment.endTimestamp = snapshot.timestamp
})

if (activeSegment) {
segments.push(activeSegment as RecordingSegment)
}

segments = segments.map((segment) => {
// These can all be done in a loop at the end...
segment.durationMs = segment.endTimestamp - segment.startTimestamp
return segment
})

return segments
}

/**
* TODO add code sharing between plugin-server and front-end so that this method can
* call the same createSegments function as the front-end
*/
export const activeMilliseconds = (snapshots: RRWebEventSummary[]): number => {
const segments = createSegments(snapshots)
return segments.reduce((acc, segment) => {
if (segment.isActive) {
// if the segment is active but has no duration we count it as 1ms
// to distinguish it from segments with no activity at all
return acc + Math.max(1, segment.durationMs)
}

return acc
}, 0)
}
1 change: 1 addition & 0 deletions plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ export async function startPluginsServer(
consumerMaxBytes: serverConfig.KAFKA_CONSUMPTION_MAX_BYTES,
consumerMaxBytesPerPartition: serverConfig.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION,
consumerMaxWaitMs: serverConfig.KAFKA_CONSUMPTION_MAX_WAIT_MS,
summaryIngestionEnabledTeams: serverConfig.SESSION_RECORDING_SUMMARY_INGESTION_ENABLED_TEAMS,
})
stopSessionRecordingEventsConsumer = stop
joinSessionRecordingEventsConsumer = join
Expand Down
2 changes: 2 additions & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ export interface PluginsServerConfig {
SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS: number
SESSION_RECORDING_MAX_BUFFER_SIZE_KB: number
SESSION_RECORDING_REMOTE_FOLDER: string

SESSION_RECORDING_SUMMARY_INGESTION_ENABLED_TEAMS: string
}

export interface Hub extends PluginsServerConfig {
Expand Down
Loading