From e25a66b534b82bfa80d6c6cacf7eb5db03bdec0b Mon Sep 17 00:00:00 2001 From: Dave Horton Date: Sun, 15 Dec 2024 17:44:34 -0500 Subject: [PATCH] wip --- lib/session/call-session.js | 12 +++-- lib/tasks/say.js | 7 ++- lib/tasks/tts-task.js | 8 +-- lib/utils/tts-streaming-buffer.js | 88 ++++++++++++++++++------------- 4 files changed, 65 insertions(+), 50 deletions(-) diff --git a/lib/session/call-session.js b/lib/session/call-session.js index 80c83d0b..25eeb89b 100644 --- a/lib/session/call-session.js +++ b/lib/session/call-session.js @@ -589,9 +589,7 @@ class CallSession extends Emitter { const {vendor} = this.backgroundTaskManager.getTask('ttsStream').getTtsVendorData(this); v = vendor; } - - //TMP!!! - return 'cartesia'; + return v; } get appIsUsingWebsockets() { @@ -833,16 +831,20 @@ class CallSession extends Emitter { } } - async disableTtsStream() { + disableTtsStream() { if (this.isTtsStreamEnabled) { this.backgroundTaskManager.stop('ttsStream'); this.logger.debug('CallSession:disableTtsStream - ttsStream disabled'); } } - async clearTtsStream() { + clearTtsStream() { this.ttsStreamingBuffer?.clear(); } + startTtsStream() { + this.ttsStreamingBuffer?.start(); + } + async enableBotMode(gather, autoEnable) { try { let task; diff --git a/lib/tasks/say.js b/lib/tasks/say.js index 591af887..4d2b2ff3 100644 --- a/lib/tasks/say.js +++ b/lib/tasks/say.js @@ -109,8 +109,11 @@ class TaskSay extends TtsTask { } try { - //TMP!! - await this.setTtsStreamingChannelVars(/*vendor*/ 'cartesia', language, voice, credentials, ep); + + await this.setTtsStreamingChannelVars(vendor, language, voice, credentials, ep); + + await cs.startTtsStream(); + cs.requestor?.request('tts:streaming-event', '/streaming-event', {event_type: 'stream_open'}) .catch((err) => this.logger.info({err}, 'TaskSay:handlingStreaming - Error sending')); } catch (err) { diff --git a/lib/tasks/tts-task.js b/lib/tasks/tts-task.js index f6d49b0d..203b2b46 100644 --- a/lib/tasks/tts-task.js +++ b/lib/tasks/tts-task.js @@ -59,7 +59,7 @@ class TtsTask extends Task { } async setTtsStreamingChannelVars(vendor, language, voice, credentials, ep) { - let {api_key, cartesia_model_id, cartesia_voice_id} = credentials; + const {api_key, cartesia_model_id, cartesia_voice_id} = credentials; let obj; switch (vendor) { @@ -70,12 +70,6 @@ class TtsTask extends Task { }; break; case 'cartesia': - //TMP!! - cartesia_model_id = 'sonic'; - cartesia_voice_id = 'f785af04-229c-4a7c-b71b-f3194c7f08bb'; - api_key = 'sk_car_gYBQoqC19S8gK2lLHhnTT'; - //TMP!! - obj = { CARTESIA_API_KEY: api_key, CARTESIA_TTS_STREAMING_MODEL_ID: cartesia_model_id, diff --git a/lib/utils/tts-streaming-buffer.js b/lib/utils/tts-streaming-buffer.js index 5a3b297a..c4b70275 100644 --- a/lib/utils/tts-streaming-buffer.js +++ b/lib/utils/tts-streaming-buffer.js @@ -8,7 +8,32 @@ const FEED_INTERVAL = 2000; const MAX_CHUNK_SIZE = 1800; const HIGH_WATER_BUFFER_SIZE = 5000; const LOW_WATER_BUFFER_SIZE = 1000; +const MIN_INITIAL_WORDS = 4; +const findSentenceBoundary = (text, limit) => { + const sentenceEndRegex = /[.!?](?=\s|$)/g; + let lastSentenceBoundary = -1; + let match; + + while ((match = sentenceEndRegex.exec(text)) && match.index < limit) { + /* Ensure it's not a decimal point (e.g., "3.14") */ + if (match.index === 0 || !/\d$/.test(text[match.index - 1])) { + lastSentenceBoundary = match.index + 1; // Include the punctuation + } + } + return lastSentenceBoundary; +}; + +const findWordBoundary = (text, limit) => { + const wordBoundaryRegex = /\s+/g; + let lastWordBoundary = -1; + let match; + + while ((match = wordBoundaryRegex.exec(text)) && match.index < limit) { + lastWordBoundary = match.index; + } + return lastWordBoundary; +}; class TtsStreamingBuffer extends Emitter { constructor(cs) { @@ -21,6 +46,7 @@ class TtsStreamingBuffer extends Emitter { this._isFull = false; this._connectionStatus = TtsStreamingConnectionStatus.NotConnected; this._flushPending = false; + this._countSendsInThisTurn = 0; } get isEmpty() { @@ -130,6 +156,7 @@ class TtsStreamingBuffer extends Emitter { return; } else if (this._connectionStatus === TtsStreamingConnectionStatus.Connected) { + this._countSendsInThisTurn = 0; this._api(this.ep, [this.ep.uuid, 'flush']) .catch((err) => this.logger.info({err}, `TtsStreamingBuffer:flush Error flushing TTS streaming: ${JSON.stringify(err)}`)); @@ -153,7 +180,7 @@ class TtsStreamingBuffer extends Emitter { * Return the number of tokens left in the buffer. */ async _feedTokens() { - this.logger.debug('_feedTokens'); + this.logger.debug({tokens: this.tokens}, '_feedTokens'); try { if (!this.cs.isTtsStreamOpen || !this.ep || !this.tokens) { @@ -163,7 +190,7 @@ class TtsStreamingBuffer extends Emitter { if (this._connectionStatus === TtsStreamingConnectionStatus.NotConnected || this._connectionStatus === TtsStreamingConnectionStatus.Failed) { - await this.start(); + this.logger.debug('TtsStreamingBuffer:_feedTokens TTS stream is not connected'); return this.tokens.length; } @@ -172,58 +199,47 @@ class TtsStreamingBuffer extends Emitter { return this.tokens.length; } - // Helper function to find a sentence boundary - const findSentenceBoundary = (text, limit) => { - const sentenceEndRegex = /[.!?](?=\s|$)/g; - let lastSentenceBoundary = -1; - let match; - - while ((match = sentenceEndRegex.exec(text)) && match.index < limit) { - // Ensure it's not a decimal point (e.g., "3.14") - if (match.index === 0 || !/\d$/.test(text[match.index - 1])) { - lastSentenceBoundary = match.index + 1; // Include the punctuation - } + /** + * Rules: + * 1. If this is our first send, we must have at least N words + * 2. Otherwise, must EITHER have N words OR be the ending of a sentence + * + * When sending, send the max size possible, capped at a limit to avoid overwhelming the server. + */ + + /* must have at least N words, or be the ending of a sentence */ + const words = this.tokens.split(' ').length; + if (words < MIN_INITIAL_WORDS) { + const endsWithPunctuation = /[.!?]$/.test(this.tokens); + if (!endsWithPunctuation || this._countSendsInThisTurn === 0) { + this.logger.debug(`TtsStreamingBuffer:_feedTokens: only ${words} words to send, waiting for more`); + return this.tokens.length; } - return lastSentenceBoundary; - }; - - // Helper function to find a word boundary - const findWordBoundary = (text, limit) => { - const wordBoundaryRegex = /\s+/g; - let lastWordBoundary = -1; - let match; - - while ((match = wordBoundaryRegex.exec(text)) && match.index < limit) { - lastWordBoundary = match.index; - } - return lastWordBoundary; - }; + } - // Try to find the best chunk to send const limit = Math.min(MAX_CHUNK_SIZE, this.tokens.length); let chunkEnd = findSentenceBoundary(this.tokens, limit); if (chunkEnd === -1) { - // If no sentence boundary, try word boundary - - //TMP!! lets try forcing full sentences - this.logger.debug('TtsStreamingBuffer:_feedTokens: no sentence boundary found, waiting for full sentence'); - return this.tokens.length; - //chunkEnd = findWordBoundary(this.tokens, limit); + this.logger.debug('TtsStreamingBuffer:_feedTokens: no sentence boundary found, look for word boundary'); + chunkEnd = findWordBoundary(this.tokens, limit); } if (chunkEnd === -1) { - // If no boundaries at all, just take the max allowed chunkEnd = limit; } const chunk = this.tokens.slice(0, chunkEnd); - this.tokens = this.tokens.slice(chunkEnd); // Remove sent chunk and trim whitespace + this.tokens = this.tokens.slice(chunkEnd); // Remove sent chunk /* freeswitch looks for sequence of 2 newlines to determine end of message, so insert a space */ const modifiedChunk = chunk.replace(/\n\n/g, '\n \n'); + if (modifiedChunk.length > 0) { try { + this._countSendsInThisTurn++; + this.logger.debug({tokens: modifiedChunk}, + `TtsStreamingBuffer:_feedTokens: sending tokens, in send#${this._countSendsInThisTurn}`); await this._api(this.ep, [this.ep.uuid, 'send', modifiedChunk]); } catch (err) { this.logger.info({err}, 'TtsStreamingBuffer:_feedTokens Error sending TTS chunk');