Skip to content

Commit

Permalink
Merge pull request #61 from airtai/dev
Browse files Browse the repository at this point in the history
Dev #58
  • Loading branch information
harishmohanraj authored Jul 16, 2024
2 parents d7c267a + 92e8424 commit c5becf8
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 18 deletions.
8 changes: 6 additions & 2 deletions .github/workflows/fly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,16 @@ jobs:
wasp deploy fly create-db mia
wasp deploy fly deploy
REACT_APP_NAME=$REACT_APP_NAME REACT_APP_SUPPORT_EMAIL=$REACT_APP_SUPPORT_EMAIL wasp deploy fly cmd secrets set \
FASTAGENCY_DEPLOYMENT_UUID=$FASTAGENCY_DEPLOYMENT_UUID FASTAGENCY_SERVER_URL=$FASTAGENCY_SERVER_URL --context=server
FASTAGENCY_DEPLOYMENT_UUID=$FASTAGENCY_DEPLOYMENT_UUID FASTAGENCY_SERVER_URL=$FASTAGENCY_SERVER_URL AUTH_TOKEN=$AUTH_TOKEN DEVELOPER_UUID=$DEVELOPER_UUID --context=server
flyctl scale count 1 --app ${APP_NAME}-client --yes
flyctl scale count 1 --app ${APP_NAME}-server --yes
echo "APP_NAME=$APP_NAME" >> $GITHUB_ENV
working-directory: app
env:
FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }}
FASTAGENCY_DEPLOYMENT_UUID: ${{ secrets.FASTAGENCY_DEPLOYMENT_UUID }}
AUTH_TOKEN: ${{ secrets.AUTH_TOKEN }}
DEVELOPER_UUID: ${{ secrets.DEVELOPER_UUID }}
REACT_APP_NAME: ${{ vars.REACT_APP_NAME }}
FLY_IO_APP_NAME: ${{ vars.FLY_IO_APP_NAME }}
REACT_APP_SUPPORT_EMAIL: ${{ vars.REACT_APP_SUPPORT_EMAIL }}
Expand Down Expand Up @@ -141,11 +143,13 @@ jobs:
env:
FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }}
FASTAGENCY_DEPLOYMENT_UUID: ${{ secrets.FASTAGENCY_DEPLOYMENT_UUID }}
AUTH_TOKEN: ${{ secrets.AUTH_TOKEN }}
DEVELOPER_UUID: ${{ secrets.DEVELOPER_UUID }}
REACT_APP_NAME: ${{ vars.REACT_APP_NAME }}
REACT_APP_SUPPORT_EMAIL: ${{ vars.REACT_APP_SUPPORT_EMAIL }}
FASTAGENCY_SERVER_URL: ${{ vars.FASTAGENCY_SERVER_URL }}

run: |
wasp deploy fly deploy
REACT_APP_NAME=$REACT_APP_NAME REACT_APP_SUPPORT_EMAIL=$REACT_APP_SUPPORT_EMAIL wasp deploy fly cmd secrets set \
FASTAGENCY_DEPLOYMENT_UUID=$FASTAGENCY_DEPLOYMENT_UUID FASTAGENCY_SERVER_URL=$FASTAGENCY_SERVER_URL --context=server
FASTAGENCY_DEPLOYMENT_UUID=$FASTAGENCY_DEPLOYMENT_UUID FASTAGENCY_SERVER_URL=$FASTAGENCY_SERVER_URL AUTH_TOKEN=$AUTH_TOKEN DEVELOPER_UUID=$DEVELOPER_UUID --context=server
56 changes: 40 additions & 16 deletions app/src/server/websocket/nats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ function generateNatsUrl(natsUrl: string | undefined, fastAgencyServerUrl: strin
const NATS_URL = generateNatsUrl(process.env['NATS_URL'], FASTAGENCY_SERVER_URL);
console.log(`NATS_URL=${NATS_URL}`);

const AUTH_TOKEN = process.env['AUTH_TOKEN'];
const DEVELOPER_UUID = process.env['DEVELOPER_UUID'];
const FASTAGENCY_DEPLOYMENT_UUID = process.env['FASTAGENCY_DEPLOYMENT_UUID'];

const timeoutErrorMsg = 'Oops! Something went wrong. Please create a new chat and try again.';

class NatsConnectionManager {
Expand All @@ -27,16 +31,26 @@ class NatsConnectionManager {

static async getConnection(threadId: string, conversationId: number) {
if (!this.connections.has(threadId)) {
const nc = await connect({ servers: NATS_URL });
this.connections.set(threadId, {
nc,
subscriptions: new Map(),
socketConversationHistory: '',
lastSocketMessage: null,
conversationId: conversationId,
timeoutId: null,
});
console.log(`Connected to ${nc.getServer()} for threadId ${threadId}`);
try {
const token = {
user: FASTAGENCY_DEPLOYMENT_UUID,
password: AUTH_TOKEN,
chat_uuid: threadId,
};
const nc = await connect({ servers: NATS_URL, token: JSON.stringify(token) });
this.connections.set(threadId, {
nc,
subscriptions: new Map(),
socketConversationHistory: '',
lastSocketMessage: null,
conversationId: conversationId,
timeoutId: null,
});
console.log(`Connected to ${nc.getServer()} for threadId ${threadId}`);
} catch (error: any) {
console.error('Failed to connect to NATS server for threadId %s:', threadId, error);
throw new Error(`${error}`);
}
}
return this.connections.get(threadId);
}
Expand Down Expand Up @@ -126,7 +140,7 @@ async function setupSubscription(
NatsConnectionManager.addSubscription(threadId, subject, sub as Subscription);
} catch (err) {
console.error(`Error in subscribe for ${subject}: ${err}`);
return;
throw new Error(`${err}`);
}
(async () => {
for await (const m of sub) {
Expand Down Expand Up @@ -184,17 +198,25 @@ export async function sendMsgToNatsServer(
) {
try {
const threadId = currentChatDetails.uuid;
const { nc } = (await NatsConnectionManager.getConnection(threadId, conversationId)) as { nc: any };
const { nc } = (await NatsConnectionManager.getConnection(threadId, conversationId)) as {
nc: any;
};
const js = nc.jetstream();
const jc = JSONCodec();

// Initiate chat or continue conversation
const initiateChatSubject = `chat.server.initiate_chat`;
const serverInputSubject = `chat.server.messages.${threadId}`;
const serverInputSubject = `chat.server.messages.${DEVELOPER_UUID}.${FASTAGENCY_DEPLOYMENT_UUID}.${threadId}`;
const subject = shouldCallInitiateChat ? initiateChatSubject : serverInputSubject;

NatsConnectionManager.clearConversationHistory(threadId);
const payload = { user_id: userUUID, thread_id: threadId, team_id: selectedTeamUUID, msg: message };
const payload = {
user_id: DEVELOPER_UUID,
thread_id: threadId,
team_id: selectedTeamUUID,
msg: message,
deployment_id: FASTAGENCY_DEPLOYMENT_UUID,
};
console.log('-----------');
console.log(selectedTeamUUID);
console.log(payload);
Expand All @@ -211,12 +233,14 @@ export async function sendMsgToNatsServer(
NatsConnectionManager.setTimeout(threadId, timeoutCallback, 45000);

if (shouldCallInitiateChat) {
const clientInputSubject = `chat.client.messages.${threadId}`;
const clientInputSubject = `chat.client.messages.${DEVELOPER_UUID}.${FASTAGENCY_DEPLOYMENT_UUID}.${threadId}`;
await setupSubscription(js, jc, clientInputSubject, threadId, socket, context, currentChatDetails);
} else {
NatsConnectionManager.setConversationId(threadId, conversationId);
}
} catch (err) {
} catch (err: any) {
console.error(`Error in connectToNatsServer: ${err}`);
await updateDB(context, currentChatDetails.id, err.toString(), conversationId, '', true);
socket.emit('streamFromTeamFinished');
}
}

0 comments on commit c5becf8

Please sign in to comment.