Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: write recording summary events #15245

Merged
merged 46 commits into from
May 9, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
a65131b
add transformer
pauldambra Apr 25, 2023
0302f4e
configurably write summaries to a new kafka topic
pauldambra Apr 25, 2023
81095e9
add tables to receive summary events
pauldambra Apr 25, 2023
e212b50
comment was already out of date
pauldambra Apr 25, 2023
c1a7064
first pass over comments
pauldambra Apr 26, 2023
5ee61aa
now with aggregation that fails tests
pauldambra Apr 26, 2023
d468b04
Merge branch 'master' into feat/write-recording-summary
pauldambra Apr 26, 2023
906b0c4
working as an aggregating table
pauldambra Apr 26, 2023
bb0fb16
fixes
pauldambra Apr 26, 2023
45a7fa9
make all the pip installs the same
pauldambra Apr 26, 2023
84f12ea
Change this file to try and prompt prettier in CI that it's ok
pauldambra Apr 26, 2023
beff2fb
Merge branch 'master' into feat/write-recording-summary
pauldambra Apr 27, 2023
f3e0b94
explicit first and last timestamp from event summaries
pauldambra Apr 27, 2023
bfaf0a2
Merge branch 'master' into feat/write-recording-summary
pauldambra Apr 27, 2023
60ced21
but not also broken
pauldambra Apr 27, 2023
ceac97c
even more less broken sql
pauldambra Apr 27, 2023
d067d51
Merge branch 'master' into feat/write-recording-summary
pauldambra Apr 27, 2023
a8ef254
correct test setup
pauldambra Apr 27, 2023
32d19ca
include duration when querying summary
pauldambra Apr 27, 2023
a49b91f
Merge branch 'master' into feat/write-recording-summary
pauldambra May 2, 2023
020fca2
Merge branch 'master' into feat/write-recording-summary
pauldambra May 3, 2023
9d77f79
Merge branch 'master' into feat/write-recording-summary
pauldambra May 3, 2023
476d33a
switch to simple aggregate so we can partition by time
pauldambra May 3, 2023
5dfdf15
Update query snapshots
github-actions[bot] May 3, 2023
5cab032
Merge branch 'master' into feat/write-recording-summary
pauldambra May 3, 2023
94f64e0
feat: with activity milliseconds (#15366)
pauldambra May 4, 2023
54b36ce
Merge branch 'master' into feat/write-recording-summary
pauldambra May 4, 2023
18f9ab6
don't release to all, env variables will override
pauldambra May 4, 2023
f05ccec
no need to find gaps
pauldambra May 4, 2023
7d78183
Merge branch 'master' into feat/write-recording-summary
pauldambra May 4, 2023
e7e08d7
obey mypy
pauldambra May 4, 2023
bef878e
Merge branch 'master' into feat/write-recording-summary
pauldambra May 4, 2023
dde2c76
Merge branch 'master' into feat/write-recording-summary
pauldambra May 4, 2023
bda15ae
Merge branch 'master' into feat/write-recording-summary
pauldambra May 4, 2023
b657765
wrap the new code in a try catch
pauldambra May 5, 2023
2e7786e
a little more softly softly catchy monkey
pauldambra May 5, 2023
922734e
need to more careful about first url
pauldambra May 8, 2023
495e4f7
with merge state on first url
pauldambra May 8, 2023
eaf5852
more commenting
pauldambra May 8, 2023
7869a6f
Update query snapshots
github-actions[bot] May 8, 2023
db76ca5
Merge branch 'master' into feat/write-recording-summary
pauldambra May 8, 2023
81ba357
fix
pauldambra May 8, 2023
ac6578d
doh
pauldambra May 8, 2023
3473dac
Update posthog/clickhouse/client/execute.py
pauldambra May 8, 2023
1bff3e2
fix
pauldambra May 8, 2023
80e45eb
order by day
pauldambra May 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .run/Plugin Server.run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
<env name="CLICKHOUSE_SECURE" value="False" />
<env name="DATABASE_URL" value="postgres://posthog:posthog@localhost:5432/posthog" />
<env name="KAFKA_HOSTS" value="localhost:9092" />
<env name="WORKER_CONCURRENCY" value="2" />
<env name="OBJECT_STORAGE_ENABLED" value="True" />
<env name="SESSION_RECORDING_SUMMARY_INGESTION_ENABLED_TEAMS" value="all" />
<env name="WORKER_CONCURRENCY" value="2" />
<env name="SESSION_RECORDING_BLOB_PROCESSING_TEAMS" value="all" />
</envs>
<method v="2" />
</configuration>
</component>
</component>
4 changes: 3 additions & 1 deletion plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,15 @@ export function getDefaultConfig(): PluginsServerConfig {
USE_KAFKA_FOR_SCHEDULED_TASKS: true,
CLOUD_DEPLOYMENT: 'default', // Used as a Sentry tag

SESSION_RECORDING_BLOB_PROCESSING_TEAMS: '', // TODO: BW Change this to 'all' when we release it fully
SESSION_RECORDING_BLOB_PROCESSING_TEAMS: '', // TODO: Change this to 'all' when we release it fully
SESSION_RECORDING_LOCAL_DIRECTORY: '.tmp/sessions',
SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS: 60 * 10, // NOTE: 10 minutes
SESSION_RECORDING_MAX_BUFFER_SIZE_KB: ['dev', 'test'].includes(process.env.NODE_ENV || 'undefined')
? 1024 // NOTE: ~100KB in dev or test, so that even with gzipped content we still flush pretty frequently
: 1024 * 50, // ~50MB after compression in prod
SESSION_RECORDING_REMOTE_FOLDER: 'session_recordings',

SESSION_RECORDING_SUMMARY_INGESTION_ENABLED_TEAMS: '', // TODO: Change this to 'all' when we release it fully
}
}

Expand Down
1 change: 1 addition & 0 deletions plugin-server/src/config/kafka-topics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export const KAFKA_PERSON = `${prefix}clickhouse_person${suffix}`
export const KAFKA_PERSON_UNIQUE_ID = `${prefix}clickhouse_person_unique_id${suffix}`
export const KAFKA_PERSON_DISTINCT_ID = `${prefix}clickhouse_person_distinct_id${suffix}`
export const KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS = `${prefix}clickhouse_session_recording_events${suffix}`
export const KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS = `${prefix}clickhouse_session_replay_events${suffix}`
pauldambra marked this conversation as resolved.
Show resolved Hide resolved
export const KAFKA_PERFORMANCE_EVENTS = `${prefix}clickhouse_performance_events${suffix}`
export const KAFKA_EVENTS_PLUGIN_INGESTION = `${prefix}events_plugin_ingestion${suffix}`
export const KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW = `${prefix}events_plugin_ingestion_overflow${suffix}`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { HighLevelProducer as RdKafkaProducer, Message, NumberNullUndefined } fr

import {
KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS,
KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS,
KAFKA_PERFORMANCE_EVENTS,
KAFKA_SESSION_RECORDING_EVENTS,
KAFKA_SESSION_RECORDING_EVENTS_DLQ,
Expand All @@ -14,7 +15,11 @@ import { createKafkaProducer, disconnectProducer, flushProducer, produce } from
import { PipelineEvent, RawEventMessage, Team } from '../../../types'
import { KafkaConfig } from '../../../utils/db/hub'
import { status } from '../../../utils/status'
import { createPerformanceEvent, createSessionRecordingEvent } from '../../../worker/ingestion/process-event'
import {
createPerformanceEvent,
createSessionRecordingEvent,
createSessionReplayEvent,
} from '../../../worker/ingestion/process-event'
import { TeamManager } from '../../../worker/ingestion/team-manager'
import { parseEventTimestamp } from '../../../worker/ingestion/timestamps'
import { eventDroppedCounter } from '../metrics'
Expand All @@ -25,12 +30,14 @@ export const startSessionRecordingEventsConsumer = async ({
consumerMaxBytes,
consumerMaxBytesPerPartition,
consumerMaxWaitMs,
summaryIngestionEnabledTeams,
}: {
teamManager: TeamManager
kafkaConfig: KafkaConfig
consumerMaxBytes: number
consumerMaxBytesPerPartition: number
consumerMaxWaitMs: number
summaryIngestionEnabledTeams: string
}) => {
/*
For Session Recordings we need to prepare the data for ClickHouse.
Expand Down Expand Up @@ -59,7 +66,13 @@ export const startSessionRecordingEventsConsumer = async ({

const connectionConfig = createRdConnectionConfigFromEnvVars(kafkaConfig)
const producer = await createKafkaProducer(connectionConfig)
const eachBatchWithContext = eachBatch({ teamManager, producer })

const eachBatchWithContext = eachBatch({
teamManager,
producer,
summaryEnabledTeams:
summaryIngestionEnabledTeams === 'all' ? null : summaryIngestionEnabledTeams.split(',').map(parseInt),
})

// Create a node-rdkafka consumer that fetches batches of messages, runs
// eachBatchWithContext, then commits offsets for the batch.
Expand All @@ -84,7 +97,15 @@ export const startSessionRecordingEventsConsumer = async ({
}

export const eachBatch =
({ teamManager, producer }: { teamManager: TeamManager; producer: RdKafkaProducer }) =>
({
teamManager,
producer,
summaryEnabledTeams,
}: {
teamManager: TeamManager
producer: RdKafkaProducer
summaryEnabledTeams: number[] | null
}) =>
async (messages: Message[]) => {
// To start with, we simply process each message in turn,
// without attempting to perform any concurrency. There is a lot
Expand All @@ -105,7 +126,7 @@ export const eachBatch =
// DependencyUnavailableError error to distinguish between
// intermittent and permanent errors.
const pendingProduceRequests: Promise<NumberNullUndefined>[] = []
const eachMessageWithContext = eachMessage({ teamManager, producer })
const eachMessageWithContext = eachMessage({ teamManager, producer, summaryEnabledTeams })

for (const message of messages) {
const results = await retryOnDependencyUnavailableError(() => eachMessageWithContext(message))
Expand Down Expand Up @@ -148,7 +169,15 @@ export const eachBatch =
}

const eachMessage =
({ teamManager, producer }: { teamManager: TeamManager; producer: RdKafkaProducer }) =>
({
teamManager,
producer,
summaryEnabledTeams,
}: {
teamManager: TeamManager
producer: RdKafkaProducer
summaryEnabledTeams: number[] | null
}) =>
async (message: Message) => {
// For each message, we:
//
Expand Down Expand Up @@ -252,14 +281,34 @@ const eachMessage =
event.properties || {}
)

return [
const producePromises = [
produce(
producer,
KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS,
Buffer.from(JSON.stringify(clickHouseRecord)),
message.key ? Buffer.from(message.key) : null
),
]

if (summaryEnabledTeams === null || summaryEnabledTeams?.includes(team.id)) {
const replayRecord = createSessionReplayEvent(
messagePayload.uuid,
team.id,
messagePayload.distinct_id,
parseEventTimestamp(event as PluginEvent),
event.ip,
event.properties || {}
)
producePromises.push(
produce(
producer,
KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS,
Buffer.from(JSON.stringify(replayRecord)),
message.key ? Buffer.from(message.key) : null
)
)
}
return producePromises
} else if (event.event === '$performance_event') {
const clickHouseRecord = createPerformanceEvent(
messagePayload.uuid,
Expand Down
1 change: 1 addition & 0 deletions plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ export async function startPluginsServer(
consumerMaxBytes: serverConfig.KAFKA_CONSUMPTION_MAX_BYTES,
consumerMaxBytesPerPartition: serverConfig.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION,
consumerMaxWaitMs: serverConfig.KAFKA_CONSUMPTION_MAX_WAIT_MS,
summaryIngestionEnabledTeams: serverConfig.SESSION_RECORDING_SUMMARY_INGESTION_ENABLED_TEAMS,
})
stopSessionRecordingEventsConsumer = stop
joinSessionRecordingEventsConsumer = join
Expand Down
2 changes: 2 additions & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ export interface PluginsServerConfig {
SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS: number
SESSION_RECORDING_MAX_BUFFER_SIZE_KB: number
SESSION_RECORDING_REMOTE_FOLDER: string

SESSION_RECORDING_SUMMARY_INGESTION_ENABLED_TEAMS: string
}

export interface Hub extends PluginsServerConfig {
Expand Down
73 changes: 73 additions & 0 deletions plugin-server/src/worker/ingestion/process-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,79 @@ export const createSessionRecordingEvent = (

return data
}

export interface SummarizedSessionRecordingEvent {
uuid: string
timestamp: string
team_id: number
distinct_id: string
session_id: string
window_id: string | undefined
created_at: string
url: string | undefined
click_count: number
keypress_count: number
mouse_activity_count: number
}

interface RRWebEventSummaryData {
href?: string
source?: number
}

interface RRWebEventSummary {
timestamp: number
type: number
data: RRWebEventSummaryData
}

export const createSessionReplayEvent = (
uuid: string,
team_id: number,
distinct_id: string,
timestamp: DateTime,
ip: string | null,
properties: Properties
) => {
const timestampString = castTimestampOrNow(timestamp, TimestampFormat.ClickHouse)

const eventsSummaries: RRWebEventSummary[] = properties['$snapshot_data']?.['events_summary'] || []
pauldambra marked this conversation as resolved.
Show resolved Hide resolved
let clickCount = 0
let keypressCount = 0
let mouseActivity = 0
let url: string | undefined = undefined
eventsSummaries.forEach((eventSummary: RRWebEventSummary) => {
if (eventSummary.type === 3) {
mouseActivity += 1
if (eventSummary.data.source === 2) {
clickCount += 1
}
if (eventSummary.data.source === 5) {
keypressCount += 1
}
}
if (!!eventSummary.data.href?.trim().length && url === undefined) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I never know if this is the case but isn't the missing ? after trim meaning that it could throw if there is no href?

Also why not use truthiness and keep it simple :D

Suggested change
if (!!eventSummary.data.href?.trim().length && url === undefined) {
if (!url && eventSummary.data.href?.trim()) {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The elvis operator says if the thing is there carry on if not return undefined

Screenshot 2023-04-26 at 09 15 57

I prefer the explicit !! to remind myself I'm using truthiness but no strong opinion. I'll see what the tests think of the suggestion and go from there :)

url = eventSummary.data.href
}
})

const data: SummarizedSessionRecordingEvent = {
uuid,
team_id: team_id,
distinct_id: distinct_id,
session_id: properties['$session_id'],
window_id: properties['$window_id'],
pauldambra marked this conversation as resolved.
Show resolved Hide resolved
timestamp: timestampString,
pauldambra marked this conversation as resolved.
Show resolved Hide resolved
created_at: timestampString,
pauldambra marked this conversation as resolved.
Show resolved Hide resolved
click_count: clickCount,
keypress_count: keypressCount,
mouse_activity_count: mouseActivity,
url: url,
pauldambra marked this conversation as resolved.
Show resolved Hide resolved
}

return data
}

export function createPerformanceEvent(uuid: string, team_id: number, distinct_id: string, properties: Properties) {
const data: Partial<RawPerformanceEvent> = {
uuid,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ describe('session-recordings-consumer', () => {
beforeEach(() => {
postgres = new Pool({ connectionString: defaultConfig.DATABASE_URL })
teamManager = new TeamManager(postgres, {} as any)
eachBachWithDependencies = eachBatch({ producer, teamManager })
eachBachWithDependencies = eachBatch({ producer, teamManager, summaryEnabledTeams: [] })
})

afterEach(() => {
Expand Down Expand Up @@ -57,4 +57,70 @@ describe('session-recordings-consumer', () => {
// Should have send to the DLQ.
expect(producer.produce).toHaveBeenCalledTimes(1)
})

test('eachBatch emits to only one topic', async () => {
const organizationId = await createOrganization(postgres)
const teamId = await createTeam(postgres, organizationId)

await eachBachWithDependencies([
{
key: 'test',
value: JSON.stringify({ team_id: teamId, data: JSON.stringify({ event: '$snapshot' }) }),
timestamp: 123,
},
])

expect(producer.produce).toHaveBeenCalledTimes(1)
})

test('eachBatch can emit to two topics', async () => {
const organizationId = await createOrganization(postgres)
const teamId = await createTeam(postgres, organizationId)

const eachBachWithDependencies: any = eachBatch({ producer, teamManager, summaryEnabledTeams: null })

await eachBachWithDependencies([
{
key: 'test',
value: JSON.stringify({ team_id: teamId, data: JSON.stringify({ event: '$snapshot' }) }),
timestamp: 123,
},
])

expect(producer.produce).toHaveBeenCalledTimes(2)
})

test('eachBatch can emit to two topics for a specific team', async () => {
const organizationId = await createOrganization(postgres)
const teamId = await createTeam(postgres, organizationId)

const eachBachWithDependencies: any = eachBatch({ producer, teamManager, summaryEnabledTeams: [teamId] })

await eachBachWithDependencies([
{
key: 'test',
value: JSON.stringify({ team_id: teamId, data: JSON.stringify({ event: '$snapshot' }) }),
timestamp: 123,
},
])

expect(producer.produce).toHaveBeenCalledTimes(2)
})

test('eachBatch can emit to only one topic when team is not summary enabled', async () => {
const organizationId = await createOrganization(postgres)
const teamId = await createTeam(postgres, organizationId)

const eachBachWithDependencies: any = eachBatch({ producer, teamManager, summaryEnabledTeams: [teamId + 1] })

await eachBachWithDependencies([
{
key: 'test',
value: JSON.stringify({ team_id: teamId, data: JSON.stringify({ event: '$snapshot' }) }),
timestamp: 123,
},
])

expect(producer.produce).toHaveBeenCalledTimes(1)
})
})
Loading