Skip to content

Commit

Permalink
removing xray from messaging and caching for now
Browse files Browse the repository at this point in the history
  • Loading branch information
Mark Jones committed Aug 29, 2024
1 parent c9d6a21 commit 3479533
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 45 deletions.
36 changes: 18 additions & 18 deletions lib/cache/redisClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ const sts = new AWS.STS({


async function assumeRole() {
const segment = AWSXRay.getSegment() || new AWSXRay.Segment('assumeRole');
AWSXRay.setSegment(segment)
//const segment = AWSXRay.getSegment() || new AWSXRay.Segment('assumeRole');
// AWSXRay.setSegment(segment)
try{
const token = fs.readFileSync(tokenFile, 'utf8');

Expand All @@ -35,15 +35,15 @@ async function assumeRole() {
sessionToken: data.Credentials.SessionToken
};
} catch (error) {
segment.addError(error);
segment.close();
// segment.addError(error);
// segment.close();
throw error;
}

}

async function getRedisCredentials() {
const segment = AWSXRay.getSegment() || new AWSXRay.Segment('getRedisCredentials');
// const segment = AWSXRay.getSegment() || new AWSXRay.Segment('getRedisCredentials');
try {
const credentials = await assumeRole();
const secretsManager = new AWS.SecretsManager({
Expand All @@ -52,7 +52,7 @@ async function getRedisCredentials() {
});

const data = await secretsManager.getSecretValue({ SecretId: secretId }).promise();
segment.close();
// segment.close();
if ('SecretString' in data) {
return JSON.parse(data.SecretString);
} else {
Expand All @@ -61,14 +61,14 @@ async function getRedisCredentials() {
}
} catch (err) {
logger.error('Error retrieving Redis credentials:', err);
segment.addError(error);
segment.close();
// segment.addError(error);
// segment.close();
throw err;
}
}

(async function initializeRedis() {
const segment = AWSXRay.getSegment() || new AWSXRay.Segment('initializeRedis');
// const segment = AWSXRay.getSegment() || new AWSXRay.Segment('initializeRedis');
try {
let redisConfig;

Expand All @@ -94,26 +94,26 @@ async function getRedisCredentials() {

redisClient.on('connect', () => {
logger.info('Connected to Redis');
const connectSegment = segment.addNewSubsegment('RedisConnect');
connectSegment.addAnnotation('message', 'Connected to Redis');
connectSegment.close();
// const connectSegment = segment.addNewSubsegment('RedisConnect');
// connectSegment.addAnnotation('message', 'Connected to Redis');
//connectSegment.close();
});

redisClient.on('error', (err) => {
logger.error('Redis error:', err);
const errorSegment = segment.addNewSubsegment('RedisError');
errorSegment.addError(err);
errorSegment.close();
// const errorSegment = segment.addNewSubsegment('RedisError');
// errorSegment.addError(err);
// errorSegment.close();
});

module.exports = redisClient;

} catch (err) {
logger.error('Failed to initialize Redis client:', err);
segment.addError(err);
segment.close();
// segment.addError(err);
// segment.close();
process.exit(1);
} finally {
segment.close();
// segment.close();
}
})();
54 changes: 27 additions & 27 deletions lib/mq/mqUtils.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const amqp = require('amqplib');
const logger = require('../logging/logger');
const { formatError } = require('../logging/loggerUtil');
const AWSXRay = require('aws-xray-sdk');
//const AWSXRay = require('aws-xray-sdk');

const RABBITMQ_URL = process.env.RABBITMQ_URL;
const SERVICE_NAME = process.env.SERVICE_NAME;
Expand All @@ -20,56 +20,56 @@ class MockSegment {


async function connect() {
const segment = new AWSXRay.Segment('RabbitMQ-Connect');
AWSXRay.setSegment(segment)
// const segment = new AWSXRay.Segment('RabbitMQ-Connect');
// AWSXRay.setSegment(segment)
try {
const connection = await amqp.connect(RABBITMQ_URL);
connection.on('error', (error) => {
logger.error('Error from RabbitMQ Connection:', formatError(error));
if (segment) segment.addError(error);
// if (segment) segment.addError(error);

});
const channel = await connection.createChannel();
channel.on('error', (error) => {
logger.error('Error from RabbitMQ Channel:', formatError(error));
if (segment) segment.addError(error);
// if (segment) segment.addError(error);

});

return { channel, connection };
} catch (error) {
logger.error('Failed to connect to RabbitMQ:', formatError(error));
if (segment) segment.addError(error);
// if (segment) segment.addError(error);
throw error;
} finally {
if (segment) segment.close();
// if (segment) segment.close();
}
}

const retryConnect = async (maxRetries = 5, delay = 1000) => {
const segment = new AWSXRay.Segment('RabbitMQ-RetryConnect');
AWSXRay.setSegment(segment)
// const segment = new AWSXRay.Segment('RabbitMQ-RetryConnect');
// AWSXRay.setSegment(segment)
for (let i = 0; i < maxRetries; i++) {
try {
const connectionData = await connect();
if (segment) segment.close();
// if (segment) segment.close();
return connectionData;
} catch (error) {
logger.error(`Retry ${i + 1}: Failed to connect to RabbitMQ`, formatError(error));
if (segment) segment.addError(error);
// if (segment) segment.addError(error);
await new Promise(resolve => setTimeout(resolve, delay));
delay *= 2;
}
}
const error = new Error('Exceeded max retries for connecting to RabbitMQ');
if (segment) segment.addError(error);
if (segment) segment.close();
// if (segment) segment.addError(error);
// if (segment) segment.close();
throw error;
};

async function createAndBindQueue(channel) {
const segment = new AWSXRay.Segment('RabbitMQ-CreateAndBindQueue');
AWSXRay.setSegment(segment)
// const segment = new AWSXRay.Segment('RabbitMQ-CreateAndBindQueue');
// AWSXRay.setSegment(segment)
try {
logger.info(`Using queue name: ${QUEUE_NAME}`);
await channel.assertExchange('fanoutExchange', 'fanout');
Expand All @@ -79,45 +79,45 @@ async function createAndBindQueue(channel) {
return true;
} catch (error) {
logger.error("Error interacting with RabbitMQ:", formatError(error));
if (segment) segment.addError(error);
// if (segment) segment.addError(error);
return false;
}finally {
if (segment) segment.close();
// if (segment) segment.close();
}
}

async function listenToQueue(channel, callback) {
const segment = new AWSXRay.Segment('RabbitMQ-ListenToQueue');
AWSXRay.setSegment(segment)
// const segment = new AWSXRay.Segment('RabbitMQ-ListenToQueue');
// AWSXRay.setSegment(segment)
channel.consume(QUEUE_NAME, (message) => {
const subsegment = segment.addNewSubsegment('Process-Message');
// const subsegment = segment.addNewSubsegment('Process-Message');
try {
const parsedMessage = JSON.parse(message.content.toString());
callback(parsedMessage, () => {
channel.ack(message);
});
} catch (processingError) {
logger.error('Error processing message:', formatError(processingError));
if (subsegment) subsegment.addError(processingError);
// if (subsegment) subsegment.addError(processingError);
channel.nack(message);
} finally {
if (subsegment) subsegment.close();
// if (subsegment) subsegment.close();
}
}, { noAck: false });
if (segment) segment.close();
//if (segment) segment.close();
}

async function publishMessage(channel, message) {
const segment = new AWSXRay.Segment('RabbitMQ-PublishMessage');
AWSXRay.setSegment(segment)
// const segment = new AWSXRay.Segment('RabbitMQ-PublishMessage');
// AWSXRay.setSegment(segment)
try {
channel.publish('fanoutExchange', '', Buffer.from(JSON.stringify(message)));
} catch (publishError) {
logger.error('Error publishing message:', formatError(publishError));
if (segment) segment.addError(publishError);
// if (segment) segment.addError(publishError);
throw publishError;
} finally {
if (segment) segment.close();
// if (segment) segment.close();
}
}

Expand Down

0 comments on commit 3479533

Please sign in to comment.