Skip to content

Tracking Azure ServiceBus message processing with Apache Qpid (2.x)

Trask Stalnaker edited this page Jan 20, 2021 · 1 revision

This article provides an example on how to do distributed tracing manually using 3rd party AMQP libraries to process ServiceBus messages.

Azure ServiceBus for .NET has native support for distributed tracing and works with Application Insights. We are working on providing similar behavior in Azure ServiceBus SDK for Java.

The example works with the protocol described in this article which is supported by Azure ServiceBus .NET SDK. In future, it will be changed to W3C distributed tracing protocol for AMQP.

Example

import com.microsoft.applicationinsights.TelemetryClient;
import com.microsoft.applicationinsights.TelemetryConfiguration;
import com.microsoft.applicationinsights.internal.util.DateTimeUtils;
import com.microsoft.applicationinsights.telemetry.RequestTelemetry;
import com.microsoft.applicationinsights.web.extensibility.initializers.WebAppNameContextInitializer;
import com.microsoft.applicationinsights.web.extensibility.initializers.WebOperationIdTelemetryInitializer;
import com.microsoft.applicationinsights.web.internal.RequestTelemetryContext;
import com.microsoft.applicationinsights.web.internal.ThreadContext;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import java.util.Hashtable;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import org.apache.qpid.jms.message.JmsMessage;
import org.apache.qpid.jms.message.facade.JmsMessageFacade;
import org.apache.qpid.jms.util.TypeConversionSupport;

public class Application {

  private static final String DIAGNOSTIC_ID_PROPERTY_NAME = "Diagnostic-Id";
  protected static final TelemetryClient telemetryClient = new TelemetryClient();

  public static void main(String[] args) throws Exception {
    // set up instrumentation key
    TelemetryConfiguration.getActive().setInstrumentationKey("Your instrumentation key");

    // add telemetry initializer that is responsible for correlation
    TelemetryConfiguration.getActive().getTelemetryInitializers().add(new WebOperationIdTelemetryInitializer());

    // add Context initializer responsible for role name, so it would look right on the  Application Map
    WebAppNameContextInitializer appNameInitializer = new WebAppNameContextInitializer();
    appNameInitializer.setAppName("java-worker");
    TelemetryConfiguration.getActive().getContextInitializers().add(appNameInitializer);


    String connectionString = "Endpoint=sb://my-servicebus.servicebus.windows.net/....";
    ConnectionStringBuilder csb = new ConnectionStringBuilder(connectionString);

    // set up JNDI context
    Hashtable<String, String> hashtable = new Hashtable<String, String>();
    hashtable.put("connectionfactory.SBCF", "amqps://" + csb.getEndpoint().getHost() + "?amqp.idleTimeout=120000&amqp.traceFrames=true");
    hashtable.put("queue.QUEUE", "queue");
    hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
    Context context = new InitialContext(hashtable);
    ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");

    Destination queue = (Destination) context.lookup("QUEUE");

    // Create Connection
    Connection connection = cf.createConnection(csb.getSasKeyName(), csb.getSasKey());
    connection.start();
    // Create Session, no transaction, client ack
    Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
    // Create consumer
    MessageConsumer consumer = session.createConsumer(queue);
    // create a listener callback to receive the messages
    consumer.setMessageListener(message -> {

      RequestTelemetry request = createRequestTelemetry(message);

      try {
        message.acknowledge();

        String body = null;
        if (message instanceof BytesMessage) {
          byte[] payload = message.getBody(byte[].class);
          body = new String(payload);
        }

        if (body.contains("exception")) {
          throw  new Exception("body is invalid");
        }
        request.setSuccess(true);
      } catch (Exception e) {
        request.setSuccess(false);
        telemetryClient.trackException(e);
      } finally {
        telemetryClient.trackRequest(request);
      }
    });

    System.in.read();

    consumer.close();
    session.close();
    connection.stop();
    connection.close();
    telemetryClient.flush();

    Thread.sleep(5000);
  }

  private static String getDiagnosticId(Message message)  {
    JmsMessage jmsMessage = (JmsMessage)message;
    if (jmsMessage != null){
      JmsMessageFacade facade = jmsMessage.getFacade();

      try {
        if (facade != null && facade.propertyExists(DIAGNOSTIC_ID_PROPERTY_NAME)) {
          Object diagIdObj = facade.getProperty(DIAGNOSTIC_ID_PROPERTY_NAME);
          return (String)TypeConversionSupport.convert(diagIdObj, String.class);
        }
      } catch (JMSException e) {
        e.printStackTrace();
      }
    }

    return null;
  }

  private static String extractRootId(String parentId) {
    int rootEnd = parentId.indexOf('.');
    if (rootEnd < 0) {
      rootEnd = parentId.length();
    }

    int rootStart = parentId.charAt(0) == '|' ? 1 : 0;
    return parentId.substring(rootStart, rootEnd);
  }

  private static RequestTelemetry createRequestTelemetry(Message message) {

    // get parentId from the message
    String diagnosticId = getDiagnosticId(message);

    // create context - it will allow to correlate other telemetry reported in scope of this request
    RequestTelemetryContext requestCtx = new RequestTelemetryContext(
        DateTimeUtils.getDateTimeNow().getTime());

    RequestTelemetry request = requestCtx.getHttpRequestTelemetry();

    // set proper correlation ids
    request.getContext().getOperation().setParentId(diagnosticId);
    request.getContext().getOperation().setId(extractRootId(diagnosticId));

    // set meaningful name
    request.setName("Process message");

    // mimic AppInsights .NET service bus properties
    request.setSource("type:Azure Service Bus | name:queue | endpoint:sb://my.servicebus.windows.net/");

    // make sure context is set on the thread.
    ThreadContext.setRequestTelemetryContext(requestCtx);

    return  request;
  }
}

Views

End-to-end transaction viewer

e2e

Application Map

appmap

Useful links

If you want to use your own protocol for correlation or implement it on the consumer side, please go through following articles: