Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/PostHog #1563

Merged
merged 3 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CONTRIBUTING-ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ Flowise 支持不同的环境变量来配置您的实例。您可以在 `package
| DATABASE_NAME | 数据库名称(当 DATABASE_TYPE 不是 sqlite 时) | 字符串 | |
| SECRETKEY_PATH | 保存加密密钥(用于加密/解密凭据)的位置 | 字符串 | `your-path/Flowise/packages/server` |
| FLOWISE_SECRETKEY_OVERWRITE | 加密密钥用于替代存储在 SECRETKEY_PATH 中的密钥 | 字符串 |
| DISABLE_FLOWISE_TELEMETRY | 关闭遥测 | 字符串 |

您也可以在使用 `npx` 时指定环境变量。例如:

Expand Down
1 change: 1 addition & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ Flowise support different environment variables to configure your instance. You
| DATABASE_SSL | Database connection overssl (When DATABASE_TYPE is postgre) | Boolean | false |
| SECRETKEY_PATH | Location where encryption key (used to encrypt/decrypt credentials) is saved | String | `your-path/Flowise/packages/server` |
| FLOWISE_SECRETKEY_OVERWRITE | Encryption key to be used instead of the key stored in SECRETKEY_PATH | String |
| DISABLE_FLOWISE_TELEMETRY | Turn off telemetry | Boolean |

You can also specify the env variables when using `npx`. For example:

Expand Down
4 changes: 3 additions & 1 deletion docker/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ LOG_PATH=/root/.flowise/logs
# LANGCHAIN_TRACING_V2=true
# LANGCHAIN_ENDPOINT=https://api.smith.langchain.com
# LANGCHAIN_API_KEY=your_api_key
# LANGCHAIN_PROJECT=your_project
# LANGCHAIN_PROJECT=your_project

# DISABLE_FLOWISE_TELEMETRY=true
1 change: 1 addition & 0 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ services:
- FLOWISE_SECRETKEY_OVERWRITE=${FLOWISE_SECRETKEY_OVERWRITE}
- LOG_LEVEL=${LOG_LEVEL}
- LOG_PATH=${LOG_PATH}
- DISABLE_FLOWISE_TELEMETRY=${DISABLE_FLOWISE_TELEMETRY}
ports:
- '${PORT}:${PORT}'
volumes:
Expand Down
2 changes: 2 additions & 0 deletions packages/server/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,5 @@ PORT=3000
# LANGCHAIN_ENDPOINT=https://api.smith.langchain.com
# LANGCHAIN_API_KEY=your_api_key
# LANGCHAIN_PROJECT=your_project

# DISABLE_FLOWISE_TELEMETRY=true
1 change: 1 addition & 0 deletions packages/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
"multer": "^1.4.5-lts.1",
"mysql": "^2.18.1",
"pg": "^8.11.1",
"posthog-node": "^3.5.0",
"reflect-metadata": "^0.1.13",
"sanitize-html": "^2.11.0",
"socket.io": "^4.6.1",
Expand Down
6 changes: 5 additions & 1 deletion packages/server/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ export default class Start extends Command {
LANGCHAIN_TRACING_V2: Flags.string(),
LANGCHAIN_ENDPOINT: Flags.string(),
LANGCHAIN_API_KEY: Flags.string(),
LANGCHAIN_PROJECT: Flags.string()
LANGCHAIN_PROJECT: Flags.string(),
DISABLE_FLOWISE_TELEMETRY: Flags.string()
}

async stopProcess() {
Expand Down Expand Up @@ -113,6 +114,9 @@ export default class Start extends Command {
if (flags.LANGCHAIN_API_KEY) process.env.LANGCHAIN_API_KEY = flags.LANGCHAIN_API_KEY
if (flags.LANGCHAIN_PROJECT) process.env.LANGCHAIN_PROJECT = flags.LANGCHAIN_PROJECT

// Telemetry
if (flags.DISABLE_FLOWISE_TELEMETRY) process.env.DISABLE_FLOWISE_TELEMETRY = flags.DISABLE_FLOWISE_TELEMETRY

await (async () => {
try {
logger.info('Starting Flowise...')
Expand Down
43 changes: 42 additions & 1 deletion packages/server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ import {
getSessionChatHistory,
getAllConnectedNodes,
clearSessionMemory,
findMemoryNode
findMemoryNode,
getTelemetryFlowObj,
getAppVersion
} from './utils'
import { cloneDeep, omit, uniqWith, isEqual } from 'lodash'
import { getDataSource } from './DataSource'
Expand All @@ -64,13 +66,15 @@ import { sanitizeMiddleware } from './utils/XSS'
import axios from 'axios'
import { Client } from 'langchainhub'
import { parsePrompt } from './utils/hub'
import { Telemetry } from './utils/telemetry'
import { Variable } from './database/entities/Variable'

export class App {
app: express.Application
nodesPool: NodesPool
chatflowPool: ChatflowPool
cachePool: CachePool
telemetry: Telemetry
AppDataSource = getDataSource()

constructor() {
Expand Down Expand Up @@ -105,6 +109,9 @@ export class App {

// Initialize cache pool
this.cachePool = new CachePool()

// Initialize telemetry
this.telemetry = new Telemetry()
})
.catch((err) => {
logger.error('❌ [server]: Error during Data Source initialization:', err)
Expand Down Expand Up @@ -388,6 +395,12 @@ export class App {
const chatflow = this.AppDataSource.getRepository(ChatFlow).create(newChatFlow)
const results = await this.AppDataSource.getRepository(ChatFlow).save(chatflow)

await this.telemetry.sendTelemetry('chatflow_created', {
version: await getAppVersion(),
chatlowId: results.id,
flowGraph: getTelemetryFlowObj(JSON.parse(results.flowData)?.nodes, JSON.parse(results.flowData)?.edges)
})

return res.json(results)
})

Expand Down Expand Up @@ -674,6 +687,12 @@ export class App {
const tool = this.AppDataSource.getRepository(Tool).create(newTool)
const results = await this.AppDataSource.getRepository(Tool).save(tool)

await this.telemetry.sendTelemetry('tool_created', {
version: await getAppVersion(),
toolId: results.id,
toolName: results.name
})

return res.json(results)
})

Expand Down Expand Up @@ -880,6 +899,11 @@ export class App {
const assistant = this.AppDataSource.getRepository(Assistant).create(newAssistant)
const results = await this.AppDataSource.getRepository(Assistant).save(assistant)

await this.telemetry.sendTelemetry('assistant_created', {
version: await getAppVersion(),
assistantId: results.id
})

return res.json(results)
})

Expand Down Expand Up @@ -1508,6 +1532,15 @@ export class App {
const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.data.id))

this.chatflowPool.add(chatflowid, undefined, startingNodes, incomingInput?.overrideConfig)

await this.telemetry.sendTelemetry('vector_upserted', {
version: await getAppVersion(),
chatlowId: chatflowid,
type: isInternal ? chatType.INTERNAL : chatType.EXTERNAL,
flowGraph: getTelemetryFlowObj(nodes, edges),
stopNodeId
})

return res.status(201).send('Successfully Upserted')
} catch (e: any) {
logger.error('[server]: Error:', e)
Expand Down Expand Up @@ -1784,6 +1817,13 @@ export class App {
await this.addChatMessage(apiMessage)

logger.debug(`[server]: Finished running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`)
await this.telemetry.sendTelemetry('prediction_sent', {
version: await getAppVersion(),
chatlowId: chatflowid,
chatId,
type: isInternal ? chatType.INTERNAL : chatType.EXTERNAL,
flowGraph: getTelemetryFlowObj(nodes, edges)
})

// Only return ChatId when its Internal OR incoming input has ChatId, to avoid confusion when calling API
if (incomingInput.chatId || isInternal) result.chatId = chatId
Expand All @@ -1798,6 +1838,7 @@ export class App {
async stopApp() {
try {
const removePromises: any[] = []
removePromises.push(this.telemetry.flush())
await Promise.all(removePromises)
} catch (e) {
logger.error(`❌[server]: Flowise Server shut down error: ${e}`)
Expand Down
56 changes: 56 additions & 0 deletions packages/server/src/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1082,3 +1082,59 @@ export const getAllValuesFromJson = (obj: any): any[] => {
extractValues(obj)
return values
}

/**
* Get only essential flow data items for telemetry
* @param {IReactFlowNode[]} nodes
* @param {IReactFlowEdge[]} edges
*/
export const getTelemetryFlowObj = (nodes: IReactFlowNode[], edges: IReactFlowEdge[]) => {
const nodeData = nodes.map((node) => node.id)
const edgeData = edges.map((edge) => ({ source: edge.source, target: edge.target }))
return { nodes: nodeData, edges: edgeData }
}

/**
* Get user settings file
* TODO: move env variables to settings json file, easier configuration
*/
export const getUserSettingsFilePath = () => {
const checkPaths = [path.join(getUserHome(), '.flowise', 'settings.json')]
for (const checkPath of checkPaths) {
if (fs.existsSync(checkPath)) {
return checkPath
}
}
return ''
}

/**
* Get app current version
*/
export const getAppVersion = async () => {
const getPackageJsonPath = (): string => {
const checkPaths = [
path.join(__dirname, '..', 'package.json'),
path.join(__dirname, '..', '..', 'package.json'),
path.join(__dirname, '..', '..', '..', 'package.json'),
path.join(__dirname, '..', '..', '..', '..', 'package.json'),
path.join(__dirname, '..', '..', '..', '..', '..', 'package.json')
]
for (const checkPath of checkPaths) {
if (fs.existsSync(checkPath)) {
return checkPath
}
}
return ''
}

const packagejsonPath = getPackageJsonPath()
if (!packagejsonPath) return ''
try {
const content = await fs.promises.readFile(packagejsonPath, 'utf8')
const parsedContent = JSON.parse(content)
return parsedContent.version
} catch (error) {
return ''
}
}
50 changes: 50 additions & 0 deletions packages/server/src/utils/telemetry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { v4 as uuidv4 } from 'uuid'
import { PostHog } from 'posthog-node'
import path from 'path'
import fs from 'fs'
import { getUserHome, getUserSettingsFilePath } from '.'

export class Telemetry {
postHog?: PostHog

constructor() {
if (process.env.DISABLE_FLOWISE_TELEMETRY !== 'true') {
this.postHog = new PostHog('phc_jEDuFYnOnuXsws986TLWzuisbRjwFqTl9JL8tDMgqme')
} else {
this.postHog = undefined
}
}

async id(): Promise<string> {
try {
const settingsContent = await fs.promises.readFile(getUserSettingsFilePath(), 'utf8')
const settings = JSON.parse(settingsContent)
return settings.instanceId
} catch (error) {
const instanceId = uuidv4()
const settings = {
instanceId
}
const defaultLocation = path.join(getUserHome(), '.flowise', 'settings.json')
await fs.promises.writeFile(defaultLocation, JSON.stringify(settings, null, 2))
return instanceId
}
}

async sendTelemetry(event: string, properties = {}): Promise<void> {
if (this.postHog) {
const distinctId = await this.id()
this.postHog.capture({
event,
distinctId,
properties
})
}
}

async flush(): Promise<void> {
if (this.postHog) {
await this.postHog.shutdownAsync()
}
}
}