props = new HashMap<>();
props.put("mq.message.builder.offset.property", topicProperty);
- DefaultMessageBuilder builder = new DefaultMessageBuilder();
+ final DefaultMessageBuilder builder = new DefaultMessageBuilder();
builder.configure(props);
- SinkRecord record = new SinkRecord("topic", 0, null, null, null, "message", OFFSET);
+ final SinkRecord record = new SinkRecord("topic", 0, null, null, null, "message", offset);
- Message message = builder.fromSinkRecord(getJmsContext(), record);
+ final Message message = builder.fromSinkRecord(getJmsContext(), record);
assertEquals("message", message.getBody(String.class));
- assertEquals(OFFSET, message.getLongProperty(topicProperty));
+ assertEquals(offset, message.getLongProperty(topicProperty));
}
}
diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsink/util/JsonRestApi.java b/src/integration/java/com/ibm/eventstreams/connect/mqsink/util/JsonRestApi.java
new file mode 100644
index 0000000..7a1b1d5
--- /dev/null
+++ b/src/integration/java/com/ibm/eventstreams/connect/mqsink/util/JsonRestApi.java
@@ -0,0 +1,107 @@
+/**
+ * Copyright 2022, 2023, 2024 IBM Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.eventstreams.connect.mqsink.util;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Reader;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.X509Certificate;
+import java.util.Base64;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+
+public class JsonRestApi {
+
+ public static JSONObject jsonPost(final String url, final String username, final String password,
+ final String payload) throws IOException, KeyManagementException, NoSuchAlgorithmException, JSONException {
+ final URL urlObj = new URL(url);
+ final HttpsURLConnection urlConnection = (HttpsURLConnection) urlObj.openConnection();
+ urlConnection.setHostnameVerifier(new IgnoreCertVerifier());
+ urlConnection.setSSLSocketFactory(getTrustAllCertsFactory());
+ urlConnection.setRequestProperty("Authorization", getAuthHeader(username, password));
+ urlConnection.setRequestProperty("Content-Type", "application/json");
+ urlConnection.setRequestProperty("ibm-mq-rest-csrf-token", "junit");
+ urlConnection.setDoOutput(true);
+
+ try (OutputStream os = urlConnection.getOutputStream()) {
+ final byte[] input = payload.getBytes("utf-8");
+ os.write(input, 0, input.length);
+ }
+
+ try (InputStream input = urlConnection.getInputStream()) {
+ final BufferedReader re = new BufferedReader(new InputStreamReader(input, Charset.forName("utf-8")));
+ return new JSONObject(read(re));
+ }
+ }
+
+ private static String read(final Reader re) throws IOException {
+ final StringBuilder str = new StringBuilder();
+ int ch;
+ do {
+ ch = re.read();
+ str.append((char) ch);
+ } while (ch != -1);
+ return str.toString();
+ }
+
+ private static String getAuthHeader(final String username, final String password) {
+ final String userpass = username + ":" + password;
+ final String basicAuth = "Basic " + new String(Base64.getEncoder().encode(userpass.getBytes()));
+ return basicAuth;
+ }
+
+ private static class IgnoreCertVerifier implements HostnameVerifier {
+ @Override
+ public boolean verify(final String host, final SSLSession session) {
+ return true;
+ }
+ }
+
+ private static SSLSocketFactory getTrustAllCertsFactory() throws NoSuchAlgorithmException, KeyManagementException {
+ final TrustManager[] trustAllCerts = new TrustManager[] {
+ new X509TrustManager() {
+ public X509Certificate[] getAcceptedIssuers() {
+ return null;
+ }
+
+ public void checkClientTrusted(final X509Certificate[] certs, final String authType) {
+ }
+
+ public void checkServerTrusted(final X509Certificate[] certs, final String authType) {
+ }
+ }
+ };
+ final SSLContext sc = SSLContext.getInstance("SSL");
+ sc.init(null, trustAllCerts, new java.security.SecureRandom());
+ return sc.getSocketFactory();
+ }
+}
diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsink/util/MQRestAPIHelper.java b/src/integration/java/com/ibm/eventstreams/connect/mqsink/util/MQRestAPIHelper.java
new file mode 100644
index 0000000..1e13383
--- /dev/null
+++ b/src/integration/java/com/ibm/eventstreams/connect/mqsink/util/MQRestAPIHelper.java
@@ -0,0 +1,103 @@
+/**
+ * Copyright 2022, 2023, 2024 IBM Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.eventstreams.connect.mqsink.util;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+
+public class MQRestAPIHelper {
+ private String qmgrname;
+ private int portnum;
+ private String password;
+
+ public MQRestAPIHelper(final String qmgrname, final int portnum, final String password) {
+ this.qmgrname = qmgrname;
+ this.portnum = portnum;
+ this.password = password;
+ }
+
+ public final Logger log = LoggerFactory.getLogger(MQRestAPIHelper.class);
+
+ public static final String STOP_CHANNEL = "{"
+ + " \"type\": \"runCommand\","
+ + " \"parameters\": {"
+ + " \"command\": \"STOP CHANNEL('DEV.APP.SVRCONN') MODE(QUIESCE)\""
+ + " }"
+ + "}";
+
+ public static final String START_CHANNEL = "{"
+ + " \"type\": \"runCommand\","
+ + " \"parameters\": {"
+ + " \"command\": \"START CHANNEL('DEV.APP.SVRCONN')\""
+ + " }"
+ + "}";
+
+ public int sendCommand(final String request) throws IOException, KeyManagementException, NoSuchAlgorithmException {
+ try {
+
+ final String url = "https://localhost:" + portnum + "/ibmmq/rest/v2/admin/action/qmgr/" + qmgrname
+ + "/mqsc";
+ final JSONObject commandResult = JsonRestApi.jsonPost(url, "admin", password, request);
+
+ log.debug("result = " + commandResult);
+
+ final int completionCode = commandResult.getInt("overallCompletionCode");
+ final int reasonCode = commandResult.getInt("overallReasonCode");
+
+ if (completionCode == 2 && reasonCode == 3008) {
+ return 0;
+ } else if (completionCode == 0 && reasonCode == 0) {
+ return commandResult.getJSONArray("commandResponse").length();
+ } else {
+ return -1;
+ }
+ } catch (final JSONException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static class MQRestAPIHelperBuilder {
+ private String qmgrname;
+ private int portnum;
+ private String password;
+
+ public MQRestAPIHelperBuilder qmgrname(final String qmgrname) {
+ this.qmgrname = qmgrname;
+ return this;
+ }
+
+ public MQRestAPIHelperBuilder portnum(final int portnum) {
+ this.portnum = portnum;
+ return this;
+ }
+
+ public MQRestAPIHelperBuilder password(final String password) {
+ this.password = password;
+ return this;
+ }
+
+ public MQRestAPIHelper build() {
+ return new MQRestAPIHelper(qmgrname, portnum, password);
+ }
+
+ }
+}
diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsink/util/MessageDescriptorBuilder.java b/src/integration/java/com/ibm/eventstreams/connect/mqsink/util/MessageDescriptorBuilder.java
new file mode 100644
index 0000000..1651099
--- /dev/null
+++ b/src/integration/java/com/ibm/eventstreams/connect/mqsink/util/MessageDescriptorBuilder.java
@@ -0,0 +1,51 @@
+/**
+ * Copyright 2024 IBM Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.eventstreams.connect.mqsink.util;
+
+import javax.jms.JMSContext;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import com.ibm.eventstreams.connect.mqsink.builders.DefaultMessageBuilder;
+import com.ibm.msg.client.jms.JmsConstants;
+
+public class MessageDescriptorBuilder extends DefaultMessageBuilder {
+
+ @Override
+ public Message getJMSMessage(JMSContext jmsCtxt, SinkRecord record) {
+
+ Message message = super.getJMSMessage(jmsCtxt, record);
+
+ // add MQMD values
+ // JMS_IBM_MQMD_MsgId - byte[]
+ // JMS_IBM_MQMD_ApplIdentityData - string
+ // JMS_IBM_MQMD_PutApplName - string
+ // https://www.ibm.com/docs/en/ibm-mq/9.3?topic=application-jms-message-object-properties
+ try {
+ message.setObjectProperty(JmsConstants.JMS_IBM_MQMD_MSGID, "ThisIsMyId".getBytes());
+ message.setStringProperty(JmsConstants.JMS_IBM_MQMD_APPLIDENTITYDATA, "ThisIsMyApplicationData");
+ message.setStringProperty(JmsConstants.JMS_IBM_MQMD_PUTAPPLNAME, "ThisIsMyPutApplicationName");
+
+ } catch (JMSException e) {
+ throw new ConnectException("Failed to write property", e);
+ }
+
+ return message;
+ }
+}
diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsink/util/SinkRecordBuilderForTest.java b/src/integration/java/com/ibm/eventstreams/connect/mqsink/util/SinkRecordBuilderForTest.java
new file mode 100644
index 0000000..8c048d2
--- /dev/null
+++ b/src/integration/java/com/ibm/eventstreams/connect/mqsink/util/SinkRecordBuilderForTest.java
@@ -0,0 +1,78 @@
+/**
+ * Copyright 2023, 2024 IBM Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.eventstreams.connect.mqsink.util;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+public class SinkRecordBuilderForTest {
+ private String topic;
+ private Integer partition;
+ private Schema keySchema;
+ private Object key;
+ private Schema valueSchema;
+ private Object value;
+ private Long offset;
+
+ public SinkRecordBuilderForTest() {
+ }
+
+ public SinkRecordBuilderForTest topic(final String topic) {
+ this.topic = topic;
+ return this;
+ }
+
+ public SinkRecordBuilderForTest partition(final Integer partition) {
+ this.partition = partition;
+ return this;
+ }
+
+ public SinkRecordBuilderForTest keySchema(final Schema keySchema) {
+ this.keySchema = keySchema;
+ return this;
+ }
+
+ public SinkRecordBuilderForTest key(final Object key) {
+ this.key = key;
+ return this;
+ }
+
+ public SinkRecordBuilderForTest valueSchema(final Schema valueSchema) {
+ this.valueSchema = valueSchema;
+ return this;
+ }
+
+ public SinkRecordBuilderForTest value(final Object value) {
+ this.value = value;
+ return this;
+ }
+
+ public SinkRecordBuilderForTest offset(final Long offset) {
+ this.offset = offset;
+ return this;
+ }
+
+ public SinkRecord build() {
+ return new SinkRecord(this.topic, this.partition, this.keySchema, this.key, this.valueSchema, this.value,
+ this.offset);
+ }
+
+ public String toString() {
+ return "SinkRecord.SinkRecordBuilder(topic=" + this.topic + ", partition=" + this.partition + ", keySchema="
+ + this.keySchema + ", key=" + this.key + ", valueSchema=" + this.valueSchema + ", value=" + this.value
+ + ")";
+ }
+}
diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsink/ExceptionProcessor.java b/src/main/java/com/ibm/eventstreams/connect/mqsink/ExceptionProcessor.java
new file mode 100644
index 0000000..14e255e
--- /dev/null
+++ b/src/main/java/com/ibm/eventstreams/connect/mqsink/ExceptionProcessor.java
@@ -0,0 +1,98 @@
+/**
+ * Copyright 2023, 2024 IBM Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.eventstreams.connect.mqsink;
+
+import javax.jms.JMSException;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ibm.mq.MQException;
+import com.ibm.mq.constants.MQConstants;
+
+public class ExceptionProcessor {
+
+ private static final Logger log = LoggerFactory.getLogger(ExceptionProcessor.class);
+
+ protected static int getReason(final Throwable exc) {
+ int reason = -1;
+
+ // Try to extract the MQ reason code to see if it's a retriable exception
+ Throwable t = exc.getCause();
+ while (t != null) {
+ if (t instanceof MQException) {
+ final MQException mqe = (MQException) t;
+ log.error("MQ error: CompCode {}, Reason {} {}", mqe.getCompCode(), mqe.getReason(),
+ MQConstants.lookupReasonCode(mqe.getReason()));
+ reason = mqe.getReason();
+ break;
+ } else if (t instanceof JMSException) {
+ final JMSException jmse = (JMSException) t;
+ log.error("JMS exception: error code {}", jmse.getErrorCode());
+ }
+
+ t = t.getCause(); // Moves t up the stack trace until it is null.
+ }
+ return reason;
+ }
+
+ public static boolean isClosable(final Throwable exc) {
+ final int reason = getReason(exc);
+ if (reason == MQConstants.MQRC_Q_FULL || reason == MQConstants.MQRC_PUT_INHIBITED) {
+ log.info("A queue has the GET operation intentionally inhibited, wait for next poll.");
+ return false;
+ }
+ log.info(" All MQ connections will be closed.");
+ return true;
+ }
+
+ public static boolean isRetriable(final Throwable exc) {
+ final int reason = getReason(exc);
+ switch (reason) {
+ // These reason codes indicate that the connection can be just retried later
+ // will probably recover
+ case MQConstants.MQRC_BACKED_OUT:
+ case MQConstants.MQRC_CHANNEL_NOT_AVAILABLE:
+ case MQConstants.MQRC_CONNECTION_BROKEN:
+ case MQConstants.MQRC_HOST_NOT_AVAILABLE:
+ case MQConstants.MQRC_NOT_AUTHORIZED:
+ case MQConstants.MQRC_Q_MGR_NOT_AVAILABLE:
+ case MQConstants.MQRC_Q_MGR_QUIESCING:
+ case MQConstants.MQRC_Q_MGR_STOPPING:
+ case MQConstants.MQRC_UNEXPECTED_ERROR:
+ case MQConstants.MQRC_Q_FULL:
+ case MQConstants.MQRC_PUT_INHIBITED:
+ log.info("JMS exception is retriable, wait for next poll.");
+ return true;
+ }
+ log.info("JMS exception is not retriable, the connector is in a failed state.");
+ return false;
+ }
+
+ /**
+ * Handles exceptions from MQ. Some JMS exceptions are treated as retriable
+ * meaning that the connector can keep running and just trying again is likely
+ * to fix things.
+ */
+ public static ConnectException handleException(final Throwable exc) {
+ if (isRetriable(exc)) {
+ return new RetriableException(exc);
+ }
+ return new ConnectException(exc);
+ }
+}
diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsink/JMSWorker.java b/src/main/java/com/ibm/eventstreams/connect/mqsink/JMSWorker.java
new file mode 100644
index 0000000..da588ab
--- /dev/null
+++ b/src/main/java/com/ibm/eventstreams/connect/mqsink/JMSWorker.java
@@ -0,0 +1,395 @@
+/**
+ * Copyright 2017, 2020, 2023, 2024 IBM Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.eventstreams.connect.mqsink;
+
+import java.net.MalformedURLException;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+
+import javax.jms.DeliveryMode;
+import javax.jms.InvalidDestinationRuntimeException;
+import javax.jms.JMSConsumer;
+import javax.jms.JMSContext;
+import javax.jms.JMSException;
+import javax.jms.JMSProducer;
+import javax.jms.JMSRuntimeException;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.ibm.eventstreams.connect.mqsink.builders.MessageBuilder;
+import com.ibm.eventstreams.connect.mqsink.builders.MessageBuilderFactory;
+import com.ibm.mq.jms.MQConnectionFactory;
+import com.ibm.mq.jms.MQQueue;
+import com.ibm.msg.client.wmq.WMQConstants;
+
+/**
+ * Writes messages to MQ using JMS. Uses a transacted session, adding messages
+ * to the current
+ * transaction until told to commit. Automatically reconnects as needed.
+ */
+public class JMSWorker {
+ private static final Logger log = LoggerFactory.getLogger(JMSWorker.class);
+
+ // JMS factory and context
+ private MQConnectionFactory mqConnFactory;
+ private JMSContext jmsCtxt;
+ protected JMSProducer jmsProd;
+ protected JMSConsumer jmsCons;
+
+ // MQ objects
+ private MQQueue queue;
+ protected MQQueue stateQueue;
+
+ // State
+ private boolean connected = false; // Whether connected to MQ
+ private long reconnectDelayMillis = RECONNECT_DELAY_MILLIS_MIN; // Delay between repeated reconnect attempts
+
+ // Constants
+ private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
+ private long timeToLive = Message.DEFAULT_TIME_TO_LIVE;
+
+ final private static long RECONNECT_DELAY_MILLIS_MIN = 64L;
+ final private static long RECONNECT_DELAY_MILLIS_MAX = 8192L;
+
+ protected ObjectMapper mapper;
+
+ private boolean isExactlyOnceMode = false;
+
+ private MQConnectionHelper mqConnectionHelper;
+
+ private MessageBuilder messageBuilder;
+
+ public JMSWorker() {
+ mapper = new ObjectMapper();
+ }
+
+ /**
+ * Configure this class.
+ *
+ * @param config
+ * @throws ConnectException
+ */
+ public void configure(final AbstractConfig config) throws ConnectException {
+ log.trace("[{}] Entry {}.configure, props={}", Thread.currentThread().getId(),
+ this.getClass().getName());
+
+ isExactlyOnceMode = MQSinkConnector.configSupportsExactlyOnce(config);
+ mqConnectionHelper = new MQConnectionHelper(config);
+
+ if (mqConnectionHelper.getUseIBMCipherMappings() != null) {
+ System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings", mqConnectionHelper.getUseIBMCipherMappings());
+ }
+
+ try {
+ mqConnFactory = mqConnectionHelper.createMQConnFactory();
+ queue = configureQueue(mqConnectionHelper.getQueueName(), mqConnectionHelper.isMessageBodyJms());
+ final Boolean mqmdWriteEnabled = config.getBoolean(MQSinkConfig.CONFIG_NAME_MQ_MQMD_WRITE_ENABLED);
+ queue.setBooleanProperty(WMQConstants.WMQ_MQMD_WRITE_ENABLED, mqmdWriteEnabled);
+
+ if (mqmdWriteEnabled) {
+ String mqmdMessageContext = config.getString(MQSinkConfig.CONFIG_NAME_MQ_MQMD_MESSAGE_CONTEXT);
+ if (mqmdMessageContext != null) {
+ mqmdMessageContext = mqmdMessageContext.toLowerCase(Locale.ENGLISH);
+ }
+ if ("identity".equals(mqmdMessageContext)) {
+ queue.setIntProperty(WMQConstants.WMQ_MQMD_MESSAGE_CONTEXT,
+ WMQConstants.WMQ_MDCTX_SET_IDENTITY_CONTEXT);
+ } else if ("all".equals(mqmdMessageContext)) {
+ queue.setIntProperty(WMQConstants.WMQ_MQMD_MESSAGE_CONTEXT, WMQConstants.WMQ_MDCTX_SET_ALL_CONTEXT);
+ }
+ }
+ if (isExactlyOnceMode) {
+ stateQueue = configureQueue(mqConnectionHelper.getStateQueueName(), true);
+ }
+
+ this.timeToLive = mqConnectionHelper.getTimeToLive();
+
+ this.deliveryMode = mqConnectionHelper.isPersistent() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
+
+ this.messageBuilder = MessageBuilderFactory.getMessageBuilder(config);
+ } catch (JMSException | JMSRuntimeException | MalformedURLException jmse) {
+ log.error("JMS exception {}", jmse);
+ throw new JMSWorkerConnectionException("JMS connection failed", jmse);
+ }
+
+ log.trace("[{}] Exit {}.configure", Thread.currentThread().getId(), this.getClass().getName());
+ }
+
+ /** Connects to MQ. */
+ public void connect() {
+ log.trace("[{}] Entry {}.connect", Thread.currentThread().getId(), this.getClass().getName());
+
+ createJMSContext();
+
+ createConsumerForStateQueue();
+
+ configureProducer();
+
+ connected = true;
+ log.info("Connection to MQ established");
+ log.trace("[{}] Exit {}.connect", Thread.currentThread().getId(), this.getClass().getName());
+ }
+
+ /**
+ * Internal method to connect to MQ.
+ *
+ * @throws RetriableException
+ * Operation failed, but connector should continue to
+ * retry.
+ * @throws ConnectException
+ * Operation failed and connector should stop.
+ */
+ private void maybeReconnect() throws ConnectException, RetriableException {
+ log.trace("[{}] Entry {}.maybeReconnect", Thread.currentThread().getId(), this.getClass().getName());
+
+ if (connected) {
+ log.trace("[{}] Exit {}.maybeReconnect", Thread.currentThread().getId(), this.getClass().getName());
+ return;
+ }
+
+ try {
+ connect();
+ reconnectDelayMillis = RECONNECT_DELAY_MILLIS_MIN;
+ } catch (final JMSRuntimeException jmse) {
+ // Delay slightly so that repeated reconnect loops don't run too fast
+ log.info("Connection to MQ could not be established");
+ try {
+ Thread.sleep(reconnectDelayMillis);
+ } catch (final InterruptedException ie) {
+ }
+
+ if (reconnectDelayMillis < RECONNECT_DELAY_MILLIS_MAX) {
+ reconnectDelayMillis = reconnectDelayMillis * 2;
+ }
+
+ log.error("JMS exception {}", jmse);
+ log.trace("[{}] Exit {}.maybeReconnect, retval=JMSRuntimeException", Thread.currentThread().getId(),
+ this.getClass().getName());
+ throw jmse;
+ }
+
+ log.trace("[{}] Exit {}.maybeReconnect, retval=true", Thread.currentThread().getId(),
+ this.getClass().getName());
+ }
+
+ /**
+ * Sends a message to MQ. Adds the message to the current transaction.
+ * Reconnects to MQ if required.
+ *
+ * @param r
+ * The message and schema to send
+ *
+ * @throws RetriableException
+ * Operation failed, but connector should continue to
+ * retry.
+ * @throws ConnectException
+ * Operation failed and connector should stop.
+ */
+ public void send(final SinkRecord r) throws ConnectException, RetriableException {
+ log.trace("[{}] Entry {}.send", Thread.currentThread().getId(), this.getClass().getName());
+
+ maybeReconnect();
+
+ sendSinkRecordToMQ(queue, r);
+
+ log.trace("[{}] Exit {}.send", Thread.currentThread().getId(), this.getClass().getName());
+ }
+
+ /** Closes the connection. */
+ public void close() {
+ log.trace("[{}] Entry {}.close", Thread.currentThread().getId(), this.getClass().getName());
+
+ try {
+ connected = false;
+
+ if (jmsCtxt != null) {
+ jmsCtxt.close();
+ }
+ } catch (final JMSRuntimeException jmse) {
+ log.error("", jmse);
+ } finally {
+ jmsCtxt = null;
+ log.debug("Connection to MQ closed");
+ }
+
+ log.trace("[{}] Exit {}.close", Thread.currentThread().getId(), this.getClass().getName());
+ }
+
+ /**
+ * Read a message from the state queue.
+ *
+ * @return the message
+ * @throws JsonProcessingException
+ * @throws JMSRuntimeException
+ * @throws JMSException
+ */
+ public Optional> readFromStateQueue()
+ throws JsonProcessingException, JMSRuntimeException, JMSException {
+ maybeReconnect();
+ if (jmsCons == null) {
+ return Optional.empty();
+ }
+ try {
+ final TextMessage message = (TextMessage) jmsCons.receiveNoWait();
+ if (message == null) {
+ return Optional.empty();
+ }
+ final HashMap stateMap = mapper.readValue(message.getText(),
+ new TypeReference>() {
+ });
+ return Optional.of(stateMap);
+ } catch (final JsonProcessingException jpe) {
+ log.error("An error occurred while processing (parsing) JSON content from state queue: {}",
+ jpe.getMessage());
+ throw jpe;
+ } catch (JMSException | JMSRuntimeException jmse) {
+ log.error("An error occurred while reading the state queue: {}", jmse.getMessage());
+ throw jmse;
+ }
+ }
+
+ /**
+ * Create a queue object. If mbj is true, then create a queue that supports
+ * JMS message body.
+ *
+ * @param queueName
+ * the name of the queue
+ * @param isJms
+ * whether the queue supports JMS message body
+ * @return the queue object
+ */
+ private MQQueue configureQueue(final String queueName, final Boolean isJms)
+ throws JMSException {
+ final MQQueue queue = new MQQueue(queueName);
+ queue.setMessageBodyStyle(isJms ? WMQConstants.WMQ_MESSAGE_BODY_JMS : WMQConstants.WMQ_MESSAGE_BODY_MQ);
+ return queue;
+ }
+
+ /**
+ * Send the last message to the MQ queue.
+ *
+ * @param lastCommittedOffsetMap
+ * @throws Exception
+ *
+ * @throws RetriableException
+ * Operation failed, but connector should
+ * continue to
+ * retry.
+ * @throws ConnectException
+ * Operation failed and connector should stop.
+ * @throws JMSException
+ * @throws JsonProcessingException
+ */
+ public void writeLastRecordOffsetToStateQueue(final Map lastCommittedOffsetMap)
+ throws JsonProcessingException, JMSRuntimeException, JMSException {
+ log.trace("[{}] Entry {}.writeLastRecordOffsetToStateQueue", Thread.currentThread().getId(),
+ this.getClass().getName());
+
+ maybeReconnect();
+
+ if (lastCommittedOffsetMap == null) {
+ log.error("Last committed offset map is null");
+ log.trace("[{}] Exit {}.writeLastRecordOffsetToStateQueue", Thread.currentThread().getId(),
+ this.getClass().getName());
+ return;
+ }
+
+ final TextMessage message = jmsCtxt.createTextMessage();
+ try {
+ message.setText(mapper.writeValueAsString(lastCommittedOffsetMap));
+ jmsProd.send(stateQueue, message);
+ } catch (final JsonProcessingException jpe) {
+ log.error("An error occurred while writing to the state queue, Json Processing Exception {}", jpe);
+ throw jpe;
+ } catch (JMSRuntimeException | JMSException jmse) {
+ log.error("An error occurred while writing to the state queue, JMS Exception {}", jmse);
+ throw jmse;
+ }
+ log.trace("[{}] Exit {}.writeLastRecordOffsetToStateQueue", Thread.currentThread().getId(),
+ this.getClass().getName());
+ }
+
+ /**
+ * Commits the current transaction.
+ *
+ * @throws RetriableException
+ * Operation failed, but connector should continue to
+ * retry.
+ * @throws ConnectException
+ * Operation failed and connector should stop.
+ */
+ public void commit() {
+ log.trace("[{}] Entry {}.commit", Thread.currentThread().getId(), this.getClass().getName());
+
+ maybeReconnect();
+
+ jmsCtxt.commit();
+ log.trace("[{}] Exit {}.commit", Thread.currentThread().getId(), this.getClass().getName());
+ }
+
+ /**
+ * Builds the JMS message and sends it to MQ.
+ *
+ * @param queue
+ * The MQ queue to send the message to
+ * @param record
+ * The message and schema to send
+ * @throws JMSException
+ */
+ private void sendSinkRecordToMQ(final MQQueue queue, final SinkRecord record) {
+ maybeReconnect();
+ final Message m = messageBuilder.fromSinkRecord(jmsCtxt, record);
+ jmsProd.send(queue, m);
+ }
+
+ protected void createJMSContext() {
+ if (mqConnectionHelper.getUserName() != null) {
+ jmsCtxt = mqConnFactory.createContext(mqConnectionHelper.getUserName(), mqConnectionHelper.getPassword().value(),
+ JMSContext.SESSION_TRANSACTED);
+ } else {
+ jmsCtxt = mqConnFactory.createContext(JMSContext.SESSION_TRANSACTED);
+ }
+ }
+
+ protected void configureProducer() {
+ jmsProd = jmsCtxt.createProducer();
+ jmsProd.setDeliveryMode(deliveryMode);
+ jmsProd.setTimeToLive(timeToLive);
+ }
+
+ protected void createConsumerForStateQueue() {
+ if (stateQueue != null) {
+ try {
+ jmsCons = jmsCtxt.createConsumer(stateQueue);
+ } catch (final InvalidDestinationRuntimeException e) {
+ log.error("An invalid state queue is specified.", e);
+ throw e;
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsink/JMSWorkerConnectionException.java b/src/main/java/com/ibm/eventstreams/connect/mqsink/JMSWorkerConnectionException.java
new file mode 100644
index 0000000..1566b09
--- /dev/null
+++ b/src/main/java/com/ibm/eventstreams/connect/mqsink/JMSWorkerConnectionException.java
@@ -0,0 +1,28 @@
+/**
+ * Copyright 2023, 2024 IBM Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.eventstreams.connect.mqsink;
+
+public class JMSWorkerConnectionException extends RuntimeException {
+
+ public JMSWorkerConnectionException(final String message) {
+ super(message);
+ }
+
+ public JMSWorkerConnectionException(final String message, final Throwable exc) {
+ super(message, exc);
+ }
+
+}
diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsink/JMSWriter.java b/src/main/java/com/ibm/eventstreams/connect/mqsink/JMSWriter.java
deleted file mode 100644
index 1aff03c..0000000
--- a/src/main/java/com/ibm/eventstreams/connect/mqsink/JMSWriter.java
+++ /dev/null
@@ -1,448 +0,0 @@
-/**
- * Copyright 2017, 2020 IBM Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.eventstreams.connect.mqsink;
-
-import com.ibm.mq.MQException;
-import com.ibm.mq.constants.MQConstants;
-import com.ibm.mq.jms.MQConnectionFactory;
-import com.ibm.mq.jms.MQQueue;
-import com.ibm.eventstreams.connect.mqsink.builders.MessageBuilder;
-import com.ibm.msg.client.wmq.WMQConstants;
-
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.security.GeneralSecurityException;
-import java.security.KeyStore;
-import java.security.SecureRandom;
-import java.util.Map;
-
-import javax.jms.DeliveryMode;
-import javax.jms.JMSContext;
-import javax.jms.JMSException;
-import javax.jms.JMSProducer;
-import javax.jms.JMSRuntimeException;
-import javax.jms.Message;
-import javax.net.ssl.KeyManager;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.TrustManagerFactory;
-
-import org.apache.kafka.connect.errors.ConnectException;
-import org.apache.kafka.connect.errors.RetriableException;
-import org.apache.kafka.connect.sink.SinkRecord;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Writes messages to MQ using JMS. Uses a transacted session, adding messages to the current
- * transaction until told to commit. Automatically reconnects as needed.
- */
-public class JMSWriter {
- private static final Logger log = LoggerFactory.getLogger(JMSWriter.class);
-
- // Configs
- private String userName;
- private String password;
-
- // JMS factory and context
- private MQConnectionFactory mqConnFactory;
- private JMSContext jmsCtxt;
- private JMSProducer jmsProd;
- private MQQueue queue;
- private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
- private long timeToLive = Message.DEFAULT_TIME_TO_LIVE;
-
- private MessageBuilder builder;
-
- private boolean connected = false; // Whether connected to MQ
- private boolean inflight = false; // Whether messages in-flight in current transaction
- private long reconnectDelayMillis = RECONNECT_DELAY_MILLIS_MIN; // Delay between repeated reconnect attempts
-
- final private static long RECONNECT_DELAY_MILLIS_MIN = 64L;
- final private static long RECONNECT_DELAY_MILLIS_MAX = 8192L;
-
- public JMSWriter() {
- }
-
- /**
- * Configure this class.
- *
- * @param props initial configuration
- *
- * @throws ConnectException Operation failed and connector should stop.
- */
- public void configure(final Map props) {
- log.trace("[{}] Entry {}.configure, props={}", Thread.currentThread().getId(), this.getClass().getName(), props);
-
- final String queueManager = props.get(MQSinkConnector.CONFIG_NAME_MQ_QUEUE_MANAGER);
- final String connectionMode = props.get(MQSinkConnector.CONFIG_NAME_MQ_CONNECTION_MODE);
- final String connectionNameList = props.get(MQSinkConnector.CONFIG_NAME_MQ_CONNECTION_NAME_LIST);
- final String channelName = props.get(MQSinkConnector.CONFIG_NAME_MQ_CHANNEL_NAME);
- final String queueName = props.get(MQSinkConnector.CONFIG_NAME_MQ_QUEUE);
- final String userName = props.get(MQSinkConnector.CONFIG_NAME_MQ_USER_NAME);
- final String password = props.get(MQSinkConnector.CONFIG_NAME_MQ_PASSWORD);
- final String ccdtUrl = props.get(MQSinkConnector.CONFIG_NAME_MQ_CCDT_URL);
- final String builderClass = props.get(MQSinkConnector.CONFIG_NAME_MQ_MESSAGE_BUILDER);
- final String mbj = props.get(MQSinkConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS);
- final String timeToLive = props.get(MQSinkConnector.CONFIG_NAME_MQ_TIME_TO_LIVE);
- final String persistent = props.get(MQSinkConnector.CONFIG_NAME_MQ_PERSISTENT);
- final String sslCipherSuite = props.get(MQSinkConnector.CONFIG_NAME_MQ_SSL_CIPHER_SUITE);
- final String sslPeerName = props.get(MQSinkConnector.CONFIG_NAME_MQ_SSL_PEER_NAME);
- final String sslKeystoreLocation = props.get(MQSinkConnector.CONFIG_NAME_MQ_SSL_KEYSTORE_LOCATION);
- final String sslKeystorePassword = props.get(MQSinkConnector.CONFIG_NAME_MQ_SSL_KEYSTORE_PASSWORD);
- final String sslTruststoreLocation = props.get(MQSinkConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_LOCATION);
- final String sslTruststorePassword = props.get(MQSinkConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_PASSWORD);
- final String useMQCSP = props.get(MQSinkConnector.CONFIG_NAME_MQ_USER_AUTHENTICATION_MQCSP);
- final String useIBMCipherMappings = props.get(MQSinkConnector.CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS);
-
- if (useIBMCipherMappings != null) {
- System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings", useIBMCipherMappings);
- }
-
- int transportType = WMQConstants.WMQ_CM_CLIENT;
- if (connectionMode != null) {
- if (connectionMode.equals(MQSinkConnector.CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT)) {
- transportType = WMQConstants.WMQ_CM_CLIENT;
- } else if (connectionMode.equals(MQSinkConnector.CONFIG_VALUE_MQ_CONNECTION_MODE_BINDINGS)) {
- transportType = WMQConstants.WMQ_CM_BINDINGS;
- } else {
- log.error("Unsupported MQ connection mode {}", connectionMode);
- throw new ConnectException("Unsupported MQ connection mode");
- }
- }
-
- try {
- mqConnFactory = new MQConnectionFactory();
- mqConnFactory.setTransportType(transportType);
- mqConnFactory.setQueueManager(queueManager);
- mqConnFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, true);
- if (useMQCSP != null) {
- mqConnFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, Boolean.parseBoolean(useMQCSP));
- }
-
- if (transportType == WMQConstants.WMQ_CM_CLIENT) {
- if (ccdtUrl != null) {
- final URL ccdtUrlObject;
- try {
- ccdtUrlObject = new URL(ccdtUrl);
- } catch (final MalformedURLException e) {
- log.error("MalformedURLException exception {}", e);
- throw new ConnectException("CCDT file url invalid", e);
- }
- mqConnFactory.setCCDTURL(ccdtUrlObject);
- } else {
- mqConnFactory.setConnectionNameList(connectionNameList);
- mqConnFactory.setChannel(channelName);
- }
-
- if (sslCipherSuite != null) {
- mqConnFactory.setSSLCipherSuite(sslCipherSuite);
- if (sslPeerName != null) {
- mqConnFactory.setSSLPeerName(sslPeerName);
- }
- }
-
- if (sslKeystoreLocation != null || sslTruststoreLocation != null) {
- final SSLContext sslContext = buildSslContext(sslKeystoreLocation, sslKeystorePassword, sslTruststoreLocation, sslTruststorePassword);
- mqConnFactory.setSSLSocketFactory(sslContext.getSocketFactory());
- }
- }
-
- queue = new MQQueue(queueName);
-
- this.userName = userName;
- this.password = password;
-
- queue.setMessageBodyStyle(WMQConstants.WMQ_MESSAGE_BODY_MQ);
- if (mbj != null) {
- if (Boolean.parseBoolean(mbj)) {
- queue.setMessageBodyStyle(WMQConstants.WMQ_MESSAGE_BODY_JMS);
- }
- }
-
- if (timeToLive != null) {
- this.timeToLive = Long.parseLong(timeToLive);
- }
- if (persistent != null) {
- this.deliveryMode = Boolean.parseBoolean(persistent) ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
- }
- } catch (JMSException | JMSRuntimeException jmse) {
- log.error("JMS exception {}", jmse);
- throw new ConnectException(jmse);
- }
-
- try {
- final Class extends MessageBuilder> c = Class.forName(builderClass).asSubclass(MessageBuilder.class);
- builder = c.newInstance();
- builder.configure(props);
- } catch (ClassNotFoundException | ClassCastException | IllegalAccessException | InstantiationException | NullPointerException exc) {
- log.error("Could not instantiate message builder {}", builderClass);
- throw new ConnectException("Could not instantiate message builder", exc);
- }
-
- log.trace("[{}] Exit {}.configure", Thread.currentThread().getId(), this.getClass().getName());
- }
-
- /**
- * Connects to MQ.
- */
- public void connect() {
- log.trace("[{}] Entry {}.connect", Thread.currentThread().getId(), this.getClass().getName());
-
- try {
- if (userName != null) {
- jmsCtxt = mqConnFactory.createContext(userName, password, JMSContext.SESSION_TRANSACTED);
- } else {
- jmsCtxt = mqConnFactory.createContext(JMSContext.SESSION_TRANSACTED);
- }
-
- jmsProd = jmsCtxt.createProducer();
- jmsProd.setDeliveryMode(deliveryMode);
- jmsProd.setTimeToLive(timeToLive);
- connected = true;
-
- log.info("Connection to MQ established");
- } catch (final JMSRuntimeException jmse) {
- log.info("Connection to MQ could not be established");
- log.error("JMS exception {}", jmse);
- handleException(jmse);
- }
-
- log.trace("[{}] Exit {}.connect", Thread.currentThread().getId(), this.getClass().getName());
- }
-
- /**
- * Sends a message to MQ. Adds the message to the current transaction. Reconnects to MQ if required.
- *
- * @param r The message and schema to send
- *
- * @throws RetriableException Operation failed, but connector should continue to retry.
- * @throws ConnectException Operation failed and connector should stop.
- */
- public void send(final SinkRecord r) throws ConnectException, RetriableException {
- log.trace("[{}] Entry {}.send", Thread.currentThread().getId(), this.getClass().getName());
-
- connectInternal();
-
- try {
- final Message m = builder.fromSinkRecord(jmsCtxt, r);
- inflight = true;
- jmsProd.send(queue, m);
- } catch (final JMSRuntimeException jmse) {
- log.error("JMS exception {}", jmse);
- throw handleException(jmse);
- }
-
- log.trace("[{}] Exit {}.send", Thread.currentThread().getId(), this.getClass().getName());
- }
-
-
- /**
- * Commits the current transaction.
- *
- * @throws RetriableException Operation failed, but connector should continue to retry.
- * @throws ConnectException Operation failed and connector should stop.
- */
- public void commit() throws ConnectException, RetriableException {
- log.trace("[{}] Entry {}.commit", Thread.currentThread().getId(), this.getClass().getName());
-
- connectInternal();
- try {
- if (inflight) {
- inflight = false;
- }
-
- jmsCtxt.commit();
- } catch (final JMSRuntimeException jmse) {
- log.error("JMS exception {}", jmse);
- throw handleException(jmse);
- }
-
- log.trace("[{}] Exit {}.commit", Thread.currentThread().getId(), this.getClass().getName());
- }
-
- /**
- * Closes the connection.
- */
- public void close() {
- log.trace("[{}] Entry {}.close", Thread.currentThread().getId(), this.getClass().getName());
-
- try {
- inflight = false;
- connected = false;
-
- if (jmsCtxt != null) {
- jmsCtxt.close();
- }
- } catch (final JMSRuntimeException jmse) {
- jmse.printStackTrace();
- } finally {
- jmsCtxt = null;
- log.debug("Connection to MQ closed");
- }
-
- log.trace("[{}] Exit {}.close", Thread.currentThread().getId(), this.getClass().getName());
- }
-
- /**
- * Internal method to connect to MQ.
- *
- * @throws RetriableException Operation failed, but connector should continue to retry.
- * @throws ConnectException Operation failed and connector should stop.
- */
- private void connectInternal() throws ConnectException, RetriableException {
- log.trace("[{}] Entry {}.connectInternal", Thread.currentThread().getId(), this.getClass().getName());
-
- if (connected) {
- return;
- }
-
- try {
- if (userName != null) {
- jmsCtxt = mqConnFactory.createContext(userName, password, JMSContext.SESSION_TRANSACTED);
- } else {
- jmsCtxt = mqConnFactory.createContext(JMSContext.SESSION_TRANSACTED);
- }
-
- jmsProd = jmsCtxt.createProducer();
- jmsProd.setDeliveryMode(deliveryMode);
- jmsProd.setTimeToLive(timeToLive);
- reconnectDelayMillis = RECONNECT_DELAY_MILLIS_MIN;
- connected = true;
- } catch (final JMSRuntimeException jmse) {
- // Delay slightly so that repeated reconnect loops don't run too fast
- try {
- Thread.sleep(reconnectDelayMillis);
- } catch (final InterruptedException e) {
- e.printStackTrace();
- }
-
- if (reconnectDelayMillis < RECONNECT_DELAY_MILLIS_MAX) {
- reconnectDelayMillis = reconnectDelayMillis * 2;
- }
-
- log.error("JMS exception {}", jmse);
- throw handleException(jmse);
- }
-
- log.trace("[{}] Exit {}.connectInternal", Thread.currentThread().getId(), this.getClass().getName());
- }
-
- /**
- * Handles exceptions from MQ. Some JMS exceptions are treated as retriable meaning that the
- * connector can keep running and just trying again is likely to fix things.
- */
- private ConnectException handleException(final Throwable exc) {
- boolean isRetriable = false;
- boolean mustClose = true;
- int reason = -1;
-
- // Try to extract the MQ reason code to see if it's a retriable exception
- Throwable t = exc.getCause();
- while (t != null) {
- if (t instanceof MQException) {
- final MQException mqe = (MQException) t;
- log.error("MQ error: CompCode {}, Reason {}", mqe.getCompCode(), mqe.getReason());
- reason = mqe.getReason();
- break;
- }
- t = t.getCause();
- }
-
- switch (reason) {
- // These reason codes indicate that the connection needs to be closed, but just retrying later
- // will probably recover
- case MQConstants.MQRC_BACKED_OUT:
- case MQConstants.MQRC_CHANNEL_NOT_AVAILABLE:
- case MQConstants.MQRC_CONNECTION_BROKEN:
- case MQConstants.MQRC_HOST_NOT_AVAILABLE:
- case MQConstants.MQRC_NOT_AUTHORIZED:
- case MQConstants.MQRC_Q_MGR_NOT_AVAILABLE:
- case MQConstants.MQRC_Q_MGR_QUIESCING:
- case MQConstants.MQRC_Q_MGR_STOPPING:
- case MQConstants.MQRC_UNEXPECTED_ERROR:
- isRetriable = true;
- break;
-
- // These reason codes indicates that the connect is still OK, but just retrying later
- // will probably recover - possibly with administrative action on the queue manager
- case MQConstants.MQRC_Q_FULL:
- case MQConstants.MQRC_PUT_INHIBITED:
- isRetriable = true;
- mustClose = false;
- break;
- }
-
- if (mustClose) {
- close();
- }
-
- if (isRetriable) {
- return new RetriableException(exc);
- }
-
- return new ConnectException(exc);
- }
-
- private SSLContext buildSslContext(final String sslKeystoreLocation, final String sslKeystorePassword, final String sslTruststoreLocation, final String sslTruststorePassword) {
- log.trace("[{}] Entry {}.buildSslContext", Thread.currentThread().getId(), this.getClass().getName());
-
- try {
- KeyManager[] keyManagers = null;
- TrustManager[] trustManagers = null;
-
- if (sslKeystoreLocation != null) {
- final KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
- kmf.init(loadKeyStore(sslKeystoreLocation, sslKeystorePassword), sslKeystorePassword.toCharArray());
- keyManagers = kmf.getKeyManagers();
- }
-
- if (sslTruststoreLocation != null) {
- final TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
- tmf.init(loadKeyStore(sslTruststoreLocation, sslTruststorePassword));
- trustManagers = tmf.getTrustManagers();
- }
-
- final SSLContext sslContext = SSLContext.getInstance("TLS");
- sslContext.init(keyManagers, trustManagers, new SecureRandom());
-
- log.trace("[{}] Exit {}.buildSslContext, retval={}", Thread.currentThread().getId(), this.getClass().getName(), sslContext);
- return sslContext;
- } catch (final GeneralSecurityException e) {
- throw new ConnectException("Error creating SSLContext", e);
- }
- }
-
- private KeyStore loadKeyStore(final String location, final String password) throws GeneralSecurityException {
- log.trace("[{}] Entry {}.loadKeyStore", Thread.currentThread().getId(), this.getClass().getName());
-
- try (final InputStream ksStr = new FileInputStream(location)) {
- final KeyStore ks = KeyStore.getInstance("JKS");
- ks.load(ksStr, password.toCharArray());
-
- log.trace("[{}] Exit {}.loadKeyStore, retval={}", Thread.currentThread().getId(), this.getClass().getName(), ks);
- return ks;
- } catch (final IOException e) {
- throw new ConnectException("Error reading keystore " + location, e);
- }
- }
-}
\ No newline at end of file
diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsink/MQConnectionHelper.java b/src/main/java/com/ibm/eventstreams/connect/mqsink/MQConnectionHelper.java
new file mode 100644
index 0000000..cf0b8e0
--- /dev/null
+++ b/src/main/java/com/ibm/eventstreams/connect/mqsink/MQConnectionHelper.java
@@ -0,0 +1,137 @@
+/**
+ * Copyright 2023, 2024 IBM Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.eventstreams.connect.mqsink;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import javax.jms.JMSException;
+import javax.net.ssl.SSLContext;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.connect.errors.ConnectException;
+
+import com.ibm.mq.jms.MQConnectionFactory;
+import com.ibm.msg.client.wmq.WMQConstants;
+
+public class MQConnectionHelper {
+ private AbstractConfig config;
+
+ public String getQueueManagerName() {
+ return config.getString(MQSinkConfig.CONFIG_NAME_MQ_QUEUE_MANAGER);
+ }
+
+ public String getQueueName() {
+ return config.getString(MQSinkConfig.CONFIG_NAME_MQ_QUEUE);
+ }
+
+ public String getStateQueueName() {
+ return config.getString(MQSinkConfig.CONFIG_NAME_MQ_EXACTLY_ONCE_STATE_QUEUE);
+ }
+
+ public String getUserName() {
+ return config.getString(MQSinkConfig.CONFIG_NAME_MQ_USER_NAME);
+ }
+
+ public Password getPassword() {
+ return config.getPassword(MQSinkConfig.CONFIG_NAME_MQ_PASSWORD);
+ }
+
+ public boolean isMessageBodyJms() {
+ return config.getBoolean(MQSinkConfig.CONFIG_NAME_MQ_MESSAGE_BODY_JMS);
+ }
+
+ public Long getTimeToLive() {
+ return config.getLong(MQSinkConfig.CONFIG_NAME_MQ_TIME_TO_LIVE);
+ }
+
+ public Boolean isPersistent() {
+ return config.getBoolean(MQSinkConfig.CONFIG_NAME_MQ_PERSISTENT);
+ }
+
+ public String getUseIBMCipherMappings() {
+ return config.getString(MQSinkConfig.CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS);
+ }
+
+ public int getTransportType() {
+ return getTransportType(config.getString(MQSinkConfig.CONFIG_NAME_MQ_CONNECTION_MODE));
+ }
+
+ public MQConnectionHelper(final AbstractConfig config) {
+ this.config = config;
+ }
+
+ /**
+ * Get the transport type from the connection mode.
+ *
+ * @param connectionMode
+ * the connection mode
+ * @return the transport type
+ * @throws ConnectException
+ * if the connection mode is not supported
+ */
+ public static int getTransportType(final String connectionMode) {
+ int transportType = WMQConstants.WMQ_CM_CLIENT;
+ if (connectionMode != null) {
+ if (connectionMode.equals(MQSinkConfig.CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT)) {
+ transportType = WMQConstants.WMQ_CM_CLIENT;
+ } else if (connectionMode.equals(MQSinkConfig.CONFIG_VALUE_MQ_CONNECTION_MODE_BINDINGS)) {
+ transportType = WMQConstants.WMQ_CM_BINDINGS;
+ }
+ }
+ return transportType;
+ }
+
+ /**
+ * * Create a MQ connection factory.
+ *
+ * The connection factory is configured with the supplied properties.
+ *
+ * @return
+ * @throws JMSException
+ */
+ public MQConnectionFactory createMQConnFactory() throws JMSException, MalformedURLException {
+ final MQConnectionFactory mqConnFactory = new MQConnectionFactory();
+ final int transportType = getTransportType();
+ mqConnFactory.setTransportType(transportType);
+ mqConnFactory.setQueueManager(getQueueManagerName());
+ mqConnFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP,
+ config.getBoolean(MQSinkConfig.CONFIG_NAME_MQ_USER_AUTHENTICATION_MQCSP));
+
+ if (transportType == WMQConstants.WMQ_CM_CLIENT) {
+ final String ccdtUrl = config.getString(MQSinkConfig.CONFIG_NAME_MQ_CCDT_URL);
+ if (ccdtUrl != null) {
+ mqConnFactory.setCCDTURL(new URL(ccdtUrl));
+ } else {
+ mqConnFactory.setConnectionNameList(config.getString(MQSinkConfig.CONFIG_NAME_MQ_CONNECTION_NAME_LIST));
+ mqConnFactory.setChannel(config.getString(MQSinkConfig.CONFIG_NAME_MQ_CHANNEL_NAME));
+ }
+
+ mqConnFactory.setSSLCipherSuite(config.getString(MQSinkConfig.CONFIG_NAME_MQ_SSL_CIPHER_SUITE));
+ mqConnFactory.setSSLPeerName(config.getString(MQSinkConfig.CONFIG_NAME_MQ_SSL_PEER_NAME));
+
+ final String sslKeystoreLocation = config.getString(MQSinkConfig.CONFIG_NAME_MQ_SSL_KEYSTORE_LOCATION);
+ final String sslTruststoreLocation = config.getString(MQSinkConfig.CONFIG_NAME_MQ_SSL_TRUSTSTORE_LOCATION);
+ if (sslKeystoreLocation != null || sslTruststoreLocation != null) {
+ final SSLContext sslContext = new SSLContextBuilder().buildSslContext(sslKeystoreLocation,
+ config.getPassword(MQSinkConfig.CONFIG_NAME_MQ_SSL_KEYSTORE_PASSWORD), sslTruststoreLocation, config.getPassword(MQSinkConfig.CONFIG_NAME_MQ_SSL_TRUSTSTORE_PASSWORD));
+ mqConnFactory.setSSLSocketFactory(sslContext.getSocketFactory());
+ }
+ }
+ return mqConnFactory;
+ }
+}
diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsink/MQSinkConfig.java b/src/main/java/com/ibm/eventstreams/connect/mqsink/MQSinkConfig.java
new file mode 100644
index 0000000..9795ddb
--- /dev/null
+++ b/src/main/java/com/ibm/eventstreams/connect/mqsink/MQSinkConfig.java
@@ -0,0 +1,317 @@
+/**
+ * Copyright 2023, 2024 IBM Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.eventstreams.connect.mqsink;
+
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import com.ibm.eventstreams.connect.mqsink.builders.MessageBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Range;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigDef.Width;
+import org.apache.kafka.common.config.ConfigDef.Validator;
+import org.apache.kafka.common.config.ConfigException;
+
+public class MQSinkConfig {
+
+ public static final Logger log = LoggerFactory.getLogger(MQSinkConfig.class);
+
+ public static final String CONFIG_GROUP_MQ = "mq";
+
+ public static final String CONFIG_NAME_MQ_QUEUE_MANAGER = "mq.queue.manager";
+ public static final String CONFIG_DOCUMENTATION_MQ_QUEUE_MANAGER = "The name of the MQ queue manager.";
+ public static final String CONFIG_DISPLAY_MQ_QUEUE_MANAGER = "Queue manager";
+
+ public static final String CONFIG_NAME_MQ_CONNECTION_MODE = "mq.connection.mode";
+ public static final String CONFIG_DOCUMENTATION_MQ_CONNECTION_MODE = "The connection mode - bindings or client.";
+ public static final String CONFIG_DISPLAY_MQ_CONNECTION_MODE = "Connection mode";
+ public static final String CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT = "client";
+ public static final String CONFIG_VALUE_MQ_CONNECTION_MODE_BINDINGS = "bindings";
+
+ public static final String CONFIG_NAME_MQ_CONNECTION_NAME_LIST = "mq.connection.name.list";
+ public static final String CONFIG_DOCUMENTATION_MQ_CONNNECTION_NAME_LIST = "A list of one or more host(port) entries for connecting to the queue manager. Entries are separated with a comma.";
+ public static final String CONFIG_DISPLAY_MQ_CONNECTION_NAME_LIST = "List of connection names for queue manager";
+
+ public static final String CONFIG_NAME_MQ_CHANNEL_NAME = "mq.channel.name";
+ public static final String CONFIG_DOCUMENTATION_MQ_CHANNEL_NAME = "The name of the server-connection channel.";
+ public static final String CONFIG_DISPLAY_MQ_CHANNEL_NAME = "Channel name";
+
+ public static final String CONFIG_NAME_MQ_QUEUE = "mq.queue";
+ public static final String CONFIG_DOCUMENTATION_MQ_QUEUE = "The name of the target MQ queue.";
+ public static final String CONFIG_DISPLAY_MQ_QUEUE = "Target queue";
+
+ public static final String CONFIG_NAME_MQ_EXACTLY_ONCE_STATE_QUEUE = "mq.exactly.once.state.queue";
+ public static final String CONFIG_DOCUMENTATION_MQ_EXACTLY_ONCE_STATE_QUEUE = "The name of the MQ queue used to store the state of the connector when exactly-once delivery is enabled.";
+ public static final String CONFIG_DISPLAY_MQ_EXACTLY_ONCE_STATE_QUEUE = "Exactly-once state queue";
+
+ public static final String CONFIG_NAME_MQ_USER_NAME = "mq.user.name";
+ public static final String CONFIG_DOCUMENTATION_MQ_USER_NAME = "The user name for authenticating with the queue manager.";
+ public static final String CONFIG_DISPLAY_MQ_USER_NAME = "User name";
+
+ public static final String CONFIG_NAME_MQ_PASSWORD = "mq.password";
+ public static final String CONFIG_DOCUMENTATION_MQ_PASSWORD = "The password for authenticating with the queue manager.";
+ public static final String CONFIG_DISPLAY_MQ_PASSWORD = "Password";
+
+ public static final String CONFIG_NAME_MQ_CCDT_URL = "mq.ccdt.url";
+ public static final String CONFIG_DOCUMENTATION_MQ_CCDT_URL = "The CCDT URL to use to establish a connection to the queue manager.";
+ public static final String CONFIG_DISPLAY_MQ_CCDT_URL = "CCDT URL";
+
+ public static final String CONFIG_NAME_MQ_MESSAGE_BUILDER = "mq.message.builder";
+ public static final String CONFIG_DOCUMENTATION_MQ_MESSAGE_BUILDER = "The class used to build the MQ messages.";
+ public static final String CONFIG_DISPLAY_MQ_MESSAGE_BUILDER = "Message builder";
+
+ public static final String CONFIG_NAME_MQ_MESSAGE_BODY_JMS = "mq.message.body.jms";
+ public static final String CONFIG_DOCUMENTATION_MQ_MESSAGE_BODY_JMS = "Whether to generate the message body as a JMS message type.";
+ public static final String CONFIG_DISPLAY_MQ_MESSAGE_BODY_JMS = "Message body as JMS";
+
+ public static final String CONFIG_NAME_MQ_TIME_TO_LIVE = "mq.time.to.live";
+ public static final String CONFIG_DOCUMENTATION_MQ_TIME_TO_LIVE = "Time-to-live in milliseconds for messages sent to MQ.";
+ public static final String CONFIG_DISPLAY_MQ_TIME_TO_LIVE = "Message time-to-live (ms)";
+
+ public static final String CONFIG_NAME_MQ_PERSISTENT = "mq.persistent";
+ public static final String CONFIG_DOCUMENTATION_MQ_PERSISTENT = "Send persistent or non-persistent messages to MQ.";
+ public static final String CONFIG_DISPLAY_MQ_PERSISTENT = "Send persistent messages";
+
+ public static final String CONFIG_NAME_MQ_SSL_CIPHER_SUITE = "mq.ssl.cipher.suite";
+ public static final String CONFIG_DOCUMENTATION_MQ_SSL_CIPHER_SUITE = "The name of the cipher suite for the TLS (SSL) connection.";
+ public static final String CONFIG_DISPLAY_MQ_SSL_CIPHER_SUITE = "SSL cipher suite";
+
+ public static final String CONFIG_NAME_MQ_SSL_PEER_NAME = "mq.ssl.peer.name";
+ public static final String CONFIG_DOCUMENTATION_MQ_SSL_PEER_NAME = "The distinguished name pattern of the TLS (SSL) peer.";
+ public static final String CONFIG_DISPLAY_MQ_SSL_PEER_NAME = "SSL peer name";
+
+ public static final String CONFIG_NAME_MQ_SSL_KEYSTORE_LOCATION = "mq.ssl.keystore.location";
+ public static final String CONFIG_DOCUMENTATION_MQ_SSL_KEYSTORE_LOCATION = "The path to the JKS keystore to use for the TLS (SSL) connection.";
+ public static final String CONFIG_DISPLAY_MQ_SSL_KEYSTORE_LOCATION = "SSL keystore location";
+
+ public static final String CONFIG_NAME_MQ_SSL_KEYSTORE_PASSWORD = "mq.ssl.keystore.password";
+ public static final String CONFIG_DOCUMENTATION_MQ_SSL_KEYSTORE_PASSWORD = "The password of the JKS keystore to use for the TLS (SSL) connection.";
+ public static final String CONFIG_DISPLAY_MQ_SSL_KEYSTORE_PASSWORD = "SSL keystore password";
+
+ public static final String CONFIG_NAME_MQ_SSL_TRUSTSTORE_LOCATION = "mq.ssl.truststore.location";
+ public static final String CONFIG_DOCUMENTATION_MQ_SSL_TRUSTSTORE_LOCATION = "The path to the JKS truststore to use for the TLS (SSL) connection.";
+ public static final String CONFIG_DISPLAY_MQ_SSL_TRUSTSTORE_LOCATION = "SSL truststore location";
+
+ public static final String CONFIG_NAME_MQ_SSL_TRUSTSTORE_PASSWORD = "mq.ssl.truststore.password";
+ public static final String CONFIG_DOCUMENTATION_MQ_SSL_TRUSTSTORE_PASSWORD = "The password of the JKS truststore to use for the TLS (SSL) connection.";
+ public static final String CONFIG_DISPLAY_MQ_SSL_TRUSTSTORE_PASSWORD = "SSL truststore password";
+
+ public static final String CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS = "mq.ssl.use.ibm.cipher.mappings";
+ public static final String CONFIG_DOCUMENTATION_MQ_SSL_USE_IBM_CIPHER_MAPPINGS = "Whether to set system property to control use of IBM cipher mappings.";
+ public static final String CONFIG_DISPLAY_MQ_SSL_USE_IBM_CIPHER_MAPPINGS = "Use IBM cipher mappings";
+
+ public static final String CONFIG_NAME_MQ_MESSAGE_BUILDER_KEY_HEADER = "mq.message.builder.key.header";
+ public static final String CONFIG_DOCUMENTATION_MQ_MESSAGE_BUILDER_KEY_HEADER = "The JMS message header to set from the Kafka record key.";
+ public static final String CONFIG_DISPLAY_MQ_MESSAGE_BUILDER_KEY_HEADER = "Record builder key header";
+ public static final String CONFIG_VALUE_MQ_MESSAGE_BUILDER_KEY_HEADER_JMSCORRELATIONID = "JMSCorrelationID";
+
+ public static final String CONFIG_NAME_MQ_MESSAGE_BUILDER_VALUE_CONVERTER = "mq.message.builder.value.converter";
+ public static final String CONFIG_DOCUMENTATION_MQ_MESSAGE_BUILDER_VALUE_CONVERTER = "Prefix for configuring message builder's value converter.";
+ public static final String CONFIG_DISPLAY_MQ_MESSAGE_BUILDER_VALUE_CONVERTER = "Message builder's value converter";
+
+ public static final String CONFIG_NAME_MQ_MESSAGE_BUILDER_TOPIC_PROPERTY = "mq.message.builder.topic.property";
+ public static final String CONFIG_DOCUMENTATION_MQ_MESSAGE_BUILDER_TOPIC_PROPERTY = "The JMS message property to set from the Kafka topic.";
+ public static final String CONFIG_DISPLAY_MQ_MESSAGE_BUILDER_TOPIC_PROPERTY = "Kafka topic message property";
+
+ public static final String CONFIG_NAME_MQ_MESSAGE_BUILDER_PARTITION_PROPERTY = "mq.message.builder.partition.property";
+ public static final String CONFIG_DOCUMENTATION_MQ_MESSAGE_BUILDER_PARTITION_PROPERTY = "The JMS message property to set from the Kafka partition.";
+ public static final String CONFIG_DISPLAY_MQ_MESSAGE_BUILDER_PARTITION_PROPERTY = "Kafka partition message property";
+
+ public static final String CONFIG_NAME_MQ_MESSAGE_BUILDER_OFFSET_PROPERTY = "mq.message.builder.offset.property";
+ public static final String CONFIG_DOCUMENTATION_MQ_MESSAGE_BUILDER_OFFSET_PROPERTY = "The JMS message property to set from the Kafka offset.";
+ public static final String CONFIG_DISPLAY_MQ_MESSAGE_BUILDER_OFFSET_PROPERTY = "Kafka offset message property";
+
+ public static final String CONFIG_NAME_MQ_REPLY_QUEUE = "mq.reply.queue";
+ public static final String CONFIG_DOCUMENTATION_MQ_REPLY_QUEUE = "The name of the reply-to queue, as a queue name or URI.";
+ public static final String CONFIG_DISPLAY_MQ_REPLY_QUEUE = "Reply-to queue";
+
+ public static final String CONFIG_NAME_MQ_USER_AUTHENTICATION_MQCSP = "mq.user.authentication.mqcsp";
+ public static final String CONFIG_DOCUMENTATION_MQ_USER_AUTHENTICATION_MQCSP = "Whether to use MQ connection security parameters (MQCSP).";
+ public static final String CONFIG_DISPLAY_MQ_USER_AUTHENTICATION_MQCSP = "User authentication using MQCSP";
+
+ public static final String CONFIG_NAME_KAFKA_HEADERS_COPY_TO_JMS_PROPERTIES = "mq.kafka.headers.copy.to.jms.properties";
+ public static final String CONFIG_DOCUMENTATION_KAFKA_HEADERS_COPY_TO_JMS_PROPERTIES = "Whether to copy Kafka headers to JMS message properties.";
+ public static final String CONFIG_DISPLAY_KAFKA_HEADERS_COPY_TO_JMS_PROPERTIES = "Copy Kafka headers to JMS message properties";
+
+ public static final String CONFIG_NAME_MQ_RETRY_BACKOFF_MS = "mq.retry.backoff.ms";
+ public static final String CONFIG_DOCUMENTATION_MQ_RETRY_BACKOFF_MS = "Time to wait, in milliseconds, before retrying after retriable exceptions";
+ public static final String CONFIG_DISPLAY_MQ_RETRY_BACKOFF_MS = "Retry backoff (ms)";
+
+ // https://www.ibm.com/docs/en/ibm-mq/9.3?topic=amffmcja-reading-writing-message-descriptor-from-mq-classes-jms-application
+ public static final String CONFIG_NAME_MQ_MQMD_WRITE_ENABLED = "mq.message.mqmd.write";
+ public static final String CONFIG_DISPLAY_MQ_MQMD_WRITE_ENABLED = "Enable a custom message builder to write MQ message descriptors";
+ public static final String CONFIG_DOCUMENTATION_MQ_MQMD_WRITE_ENABLED = "This configuration option determines whether the MQMD structure will be written along with the message data. Enabling this option allows control information to accompany the application data during message transmission between sending and receiving applications. Disabling this option will exclude the MQMD structure from the message payload.";
+
+ // https://www.ibm.com/docs/en/ibm-mq/9.3?topic=application-jms-message-object-properties
+ public static final String CONFIG_NAME_MQ_MQMD_MESSAGE_CONTEXT = "mq.message.mqmd.context";
+ public static final String CONFIG_DISPLAY_MQ_MQMD_MESSAGE_CONTEXT = "Message context to set on the destination queue. This is required when setting some message descriptors.";
+ public static final String CONFIG_DOCUMENTATION_MQ_MQMD_MESSAGE_CONTEXT = "This configuration option specifies the context in which MQMD properties are applied. Certain properties require this context to be set appropriately for them to take effect. Valid options for WMQ_MQMD_MESSAGE_CONTEXT are IDENTITY for WMQ_MDCTX_SET_IDENTITY_CONTEXT or ALL for WMQ_MDCTX_SET_ALL_CONTEXT.";
+
+ private static final Validator ANY_VALUE_VALID = null;
+
+ public static ConfigDef config() {
+ final ConfigDef config = new ConfigDef();
+
+ config.define(CONFIG_NAME_MQ_QUEUE_MANAGER, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new ConfigDef.NonEmptyStringWithoutControlChars(), Importance.HIGH, CONFIG_DOCUMENTATION_MQ_QUEUE_MANAGER, CONFIG_GROUP_MQ, 1, Width.MEDIUM, CONFIG_DISPLAY_MQ_QUEUE_MANAGER);
+
+ config.define(CONFIG_NAME_MQ_CONNECTION_MODE, Type.STRING, CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT, ConfigDef.ValidString.in(CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT, CONFIG_VALUE_MQ_CONNECTION_MODE_BINDINGS), Importance.MEDIUM, CONFIG_DOCUMENTATION_MQ_CONNECTION_MODE, CONFIG_GROUP_MQ, 2, Width.SHORT, CONFIG_DISPLAY_MQ_CONNECTION_MODE);
+
+ config.define(CONFIG_NAME_MQ_CONNECTION_NAME_LIST, Type.STRING, null, ANY_VALUE_VALID, Importance.MEDIUM, CONFIG_DOCUMENTATION_MQ_CONNNECTION_NAME_LIST, CONFIG_GROUP_MQ, 3, Width.LONG, CONFIG_DISPLAY_MQ_CONNECTION_NAME_LIST);
+
+ config.define(CONFIG_NAME_MQ_CHANNEL_NAME, Type.STRING, null, ANY_VALUE_VALID, Importance.MEDIUM, CONFIG_DOCUMENTATION_MQ_CHANNEL_NAME, CONFIG_GROUP_MQ, 4, Width.MEDIUM, CONFIG_DISPLAY_MQ_CHANNEL_NAME);
+
+ config.define(CONFIG_NAME_MQ_CCDT_URL, Type.STRING, null, new ValidURL(), Importance.MEDIUM, CONFIG_DOCUMENTATION_MQ_CCDT_URL, CONFIG_GROUP_MQ, 5, Width.MEDIUM, CONFIG_DISPLAY_MQ_CCDT_URL);
+
+ config.define(CONFIG_NAME_MQ_QUEUE, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new ConfigDef.NonEmptyStringWithoutControlChars(), Importance.HIGH, CONFIG_DOCUMENTATION_MQ_QUEUE, CONFIG_GROUP_MQ, 6, Width.LONG, CONFIG_DISPLAY_MQ_QUEUE);
+
+ config.define(CONFIG_NAME_MQ_USER_NAME, Type.STRING, null, ANY_VALUE_VALID, Importance.MEDIUM, CONFIG_DOCUMENTATION_MQ_USER_NAME, CONFIG_GROUP_MQ, 7, Width.MEDIUM, CONFIG_DISPLAY_MQ_USER_NAME);
+
+ config.define(CONFIG_NAME_MQ_PASSWORD, Type.PASSWORD, null, ANY_VALUE_VALID, Importance.MEDIUM, CONFIG_DOCUMENTATION_MQ_PASSWORD, CONFIG_GROUP_MQ, 8, Width.MEDIUM, CONFIG_DISPLAY_MQ_PASSWORD);
+
+ config.define(CONFIG_NAME_MQ_MESSAGE_BUILDER, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new ValidClass(), Importance.HIGH, CONFIG_DOCUMENTATION_MQ_MESSAGE_BUILDER, CONFIG_GROUP_MQ, 9, Width.MEDIUM, CONFIG_DISPLAY_MQ_MESSAGE_BUILDER);
+
+ config.define(CONFIG_NAME_MQ_MESSAGE_BODY_JMS, Type.BOOLEAN, Boolean.FALSE, new ConfigDef.NonNullValidator(), Importance.MEDIUM, CONFIG_DOCUMENTATION_MQ_MESSAGE_BODY_JMS, CONFIG_GROUP_MQ, 10, Width.SHORT, CONFIG_DISPLAY_MQ_MESSAGE_BODY_JMS);
+
+ config.define(CONFIG_NAME_MQ_TIME_TO_LIVE, Type.LONG, 0, Range.between(0L, 99999999900L), Importance.MEDIUM, CONFIG_DOCUMENTATION_MQ_TIME_TO_LIVE, CONFIG_GROUP_MQ, 11, Width.SHORT, CONFIG_DISPLAY_MQ_TIME_TO_LIVE);
+
+ config.define(CONFIG_NAME_MQ_PERSISTENT, Type.BOOLEAN, Boolean.TRUE, Importance.MEDIUM, CONFIG_DOCUMENTATION_MQ_PERSISTENT, CONFIG_GROUP_MQ, 12, Width.SHORT, CONFIG_DISPLAY_MQ_PERSISTENT);
+
+ config.define(CONFIG_NAME_MQ_SSL_CIPHER_SUITE, Type.STRING, null, ANY_VALUE_VALID, Importance.MEDIUM, CONFIG_DOCUMENTATION_MQ_SSL_CIPHER_SUITE, CONFIG_GROUP_MQ, 13, Width.MEDIUM, CONFIG_DISPLAY_MQ_SSL_CIPHER_SUITE);
+
+ config.define(CONFIG_NAME_MQ_SSL_PEER_NAME, Type.STRING, null, ANY_VALUE_VALID, Importance.MEDIUM, CONFIG_DOCUMENTATION_MQ_SSL_PEER_NAME, CONFIG_GROUP_MQ, 14, Width.MEDIUM, CONFIG_DISPLAY_MQ_SSL_PEER_NAME);
+
+ config.define(CONFIG_NAME_MQ_SSL_KEYSTORE_LOCATION, Type.STRING, null, new ValidFileLocation(), Importance.MEDIUM, CONFIG_DOCUMENTATION_MQ_SSL_KEYSTORE_LOCATION, CONFIG_GROUP_MQ, 15, Width.MEDIUM, CONFIG_DISPLAY_MQ_SSL_KEYSTORE_LOCATION);
+
+ config.define(CONFIG_NAME_MQ_SSL_KEYSTORE_PASSWORD, Type.PASSWORD, null, Importance.MEDIUM, CONFIG_DOCUMENTATION_MQ_SSL_KEYSTORE_PASSWORD, CONFIG_GROUP_MQ, 16, Width.MEDIUM, CONFIG_DISPLAY_MQ_SSL_KEYSTORE_PASSWORD);
+
+ config.define(CONFIG_NAME_MQ_SSL_TRUSTSTORE_LOCATION, Type.STRING, null, new ValidFileLocation(), Importance.MEDIUM, CONFIG_DOCUMENTATION_MQ_SSL_TRUSTSTORE_LOCATION, CONFIG_GROUP_MQ, 17, Width.MEDIUM, CONFIG_DISPLAY_MQ_SSL_TRUSTSTORE_LOCATION);
+
+ config.define(CONFIG_NAME_MQ_SSL_TRUSTSTORE_PASSWORD, Type.PASSWORD, null, Importance.MEDIUM, CONFIG_DOCUMENTATION_MQ_SSL_TRUSTSTORE_PASSWORD, CONFIG_GROUP_MQ, 18, Width.MEDIUM, CONFIG_DISPLAY_MQ_SSL_TRUSTSTORE_PASSWORD);
+
+ config.define(CONFIG_NAME_MQ_MESSAGE_BUILDER_KEY_HEADER, Type.STRING, null, ConfigDef.ValidString.in(null, CONFIG_VALUE_MQ_MESSAGE_BUILDER_KEY_HEADER_JMSCORRELATIONID), Importance.MEDIUM, CONFIG_DOCUMENTATION_MQ_MESSAGE_BUILDER_KEY_HEADER, CONFIG_GROUP_MQ, 19, Width.MEDIUM, CONFIG_DISPLAY_MQ_MESSAGE_BUILDER_KEY_HEADER);
+
+ config.define(CONFIG_NAME_MQ_MESSAGE_BUILDER_VALUE_CONVERTER, Type.STRING, null, new ValidClass(), Importance.LOW, CONFIG_DOCUMENTATION_MQ_MESSAGE_BUILDER_VALUE_CONVERTER, CONFIG_GROUP_MQ, 20, Width.MEDIUM, CONFIG_DISPLAY_MQ_MESSAGE_BUILDER_VALUE_CONVERTER);
+
+ config.define(CONFIG_NAME_MQ_MESSAGE_BUILDER_TOPIC_PROPERTY, Type.STRING, null, ANY_VALUE_VALID, Importance.LOW, CONFIG_DOCUMENTATION_MQ_MESSAGE_BUILDER_TOPIC_PROPERTY, CONFIG_GROUP_MQ, 21, Width.MEDIUM, CONFIG_DISPLAY_MQ_MESSAGE_BUILDER_TOPIC_PROPERTY);
+
+ config.define(CONFIG_NAME_MQ_MESSAGE_BUILDER_PARTITION_PROPERTY, Type.STRING, null, ANY_VALUE_VALID, Importance.LOW, CONFIG_DOCUMENTATION_MQ_MESSAGE_BUILDER_PARTITION_PROPERTY, CONFIG_GROUP_MQ, 22, Width.MEDIUM, CONFIG_DISPLAY_MQ_MESSAGE_BUILDER_PARTITION_PROPERTY);
+
+ config.define(CONFIG_NAME_MQ_MESSAGE_BUILDER_OFFSET_PROPERTY, Type.STRING, null, ANY_VALUE_VALID, Importance.LOW, CONFIG_DOCUMENTATION_MQ_MESSAGE_BUILDER_OFFSET_PROPERTY, CONFIG_GROUP_MQ, 23, Width.MEDIUM, CONFIG_DISPLAY_MQ_MESSAGE_BUILDER_OFFSET_PROPERTY);
+
+ config.define(CONFIG_NAME_MQ_REPLY_QUEUE, Type.STRING, null, ANY_VALUE_VALID, Importance.LOW, CONFIG_DOCUMENTATION_MQ_REPLY_QUEUE, CONFIG_GROUP_MQ, 24, Width.MEDIUM, CONFIG_DISPLAY_MQ_REPLY_QUEUE);
+
+ config.define(CONFIG_NAME_MQ_USER_AUTHENTICATION_MQCSP, Type.BOOLEAN, Boolean.TRUE, Importance.LOW, CONFIG_DOCUMENTATION_MQ_USER_AUTHENTICATION_MQCSP, CONFIG_GROUP_MQ, 25, Width.SHORT, CONFIG_DISPLAY_MQ_USER_AUTHENTICATION_MQCSP);
+
+ config.define(CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS, Type.BOOLEAN, null, Importance.LOW, CONFIG_DOCUMENTATION_MQ_SSL_USE_IBM_CIPHER_MAPPINGS, CONFIG_GROUP_MQ, 26, Width.SHORT, CONFIG_DISPLAY_MQ_SSL_USE_IBM_CIPHER_MAPPINGS);
+
+ config.define(CONFIG_NAME_KAFKA_HEADERS_COPY_TO_JMS_PROPERTIES, Type.BOOLEAN, Boolean.FALSE, Importance.LOW, CONFIG_DOCUMENTATION_KAFKA_HEADERS_COPY_TO_JMS_PROPERTIES, CONFIG_GROUP_MQ, 27, Width.SHORT, CONFIG_DISPLAY_KAFKA_HEADERS_COPY_TO_JMS_PROPERTIES);
+
+ config.define(CONFIG_NAME_MQ_RETRY_BACKOFF_MS, Type.LONG, 60000, Range.between(0L, 99999999900L), Importance.LOW, CONFIG_DOCUMENTATION_MQ_RETRY_BACKOFF_MS, CONFIG_GROUP_MQ, 28, Width.SHORT, CONFIG_DISPLAY_MQ_RETRY_BACKOFF_MS);
+
+ config.define(CONFIG_NAME_MQ_EXACTLY_ONCE_STATE_QUEUE, Type.STRING, null, ANY_VALUE_VALID, Importance.LOW, CONFIG_DOCUMENTATION_MQ_EXACTLY_ONCE_STATE_QUEUE, CONFIG_GROUP_MQ, 29, Width.LONG, CONFIG_DISPLAY_MQ_EXACTLY_ONCE_STATE_QUEUE);
+
+ config.define(CONFIG_NAME_MQ_MQMD_WRITE_ENABLED, Type.BOOLEAN, false, Importance.LOW,
+ CONFIG_DOCUMENTATION_MQ_MQMD_WRITE_ENABLED, CONFIG_GROUP_MQ, 30, Width.LONG,
+ CONFIG_DISPLAY_MQ_MQMD_WRITE_ENABLED);
+
+ config.define(CONFIG_NAME_MQ_MQMD_MESSAGE_CONTEXT, Type.STRING, null,
+ ConfigDef.ValidString.in(null, "identity", "IDENTITY", "all", "ALL"),
+ Importance.LOW,
+ CONFIG_DOCUMENTATION_MQ_MQMD_MESSAGE_CONTEXT, CONFIG_GROUP_MQ, 31, Width.LONG,
+ CONFIG_DISPLAY_MQ_MQMD_MESSAGE_CONTEXT);
+ return config;
+ }
+
+
+ private static class ValidURL implements ConfigDef.Validator {
+ @Override
+ public void ensureValid(final String name, final Object value) {
+ final String stringValue = (String) value;
+ if (stringValue == null || stringValue.isEmpty()) {
+ // URLs are optional values
+ return;
+ }
+
+ try {
+ new URL(stringValue);
+
+ } catch (final MalformedURLException exception) {
+ throw new ConfigException(name, value, "Value must be a URL for a CCDT file");
+ }
+ }
+ }
+
+ private static class ValidClass implements ConfigDef.Validator {
+ @Override
+ public void ensureValid(final String name, final Object value) {
+ Class requiredClass = null;
+ final String stringValue = (String) value;
+ if (name.endsWith("builder")) {
+ requiredClass = MessageBuilder.class;
+ } else { // converter
+ requiredClass = Converter.class;
+
+ }
+ if (stringValue == null || stringValue.isEmpty()) {
+ return;
+ }
+ try {
+ Class.forName(stringValue).asSubclass(requiredClass).newInstance();
+ } catch (final ClassNotFoundException exc) {
+ log.error("Failed to validate class {}", stringValue);
+ throw new ConfigException(name, value, "Class must be accessible on the classpath for Kafka Connect");
+ } catch (final ClassCastException | IllegalAccessException exc) {
+ log.error("Failed to validate class {}", stringValue);
+ throw new ConfigException(name, value, "Class must be an implementation of " + requiredClass.getCanonicalName());
+ } catch (final InstantiationException exc) {
+ log.error("Failed to validate class {}", stringValue);
+ throw new ConfigException(name, value, "Unable to create an instance of the class");
+ } catch (final NullPointerException exc) {
+ throw new ConfigException(name, value, "Value must not be null");
+ }
+ }
+ }
+
+ private static class ValidFileLocation implements ConfigDef.Validator {
+ @Override
+ public void ensureValid(final String name, final Object value) {
+ final String stringValue = (String) value;
+ if (stringValue == null || stringValue.isEmpty()) {
+ // URLs are optional values
+ return;
+ }
+ File f = null;
+ try {
+ f = new File(stringValue);
+ } catch (final Exception exception) {
+ throw new ConfigException(name, value, "Value must be a File Location");
+ }
+ if (!f.isFile()) {
+ throw new ConfigException(name, value, "Value must be a File location");
+ }
+ if (!f.canRead()) {
+ throw new ConfigException(name, value, "Value must be a readable file");
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsink/MQSinkConnector.java b/src/main/java/com/ibm/eventstreams/connect/mqsink/MQSinkConnector.java
index 763c5e9..910b84b 100644
--- a/src/main/java/com/ibm/eventstreams/connect/mqsink/MQSinkConnector.java
+++ b/src/main/java/com/ibm/eventstreams/connect/mqsink/MQSinkConnector.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2017, 2020 IBM Corporation
+ * Copyright 2017, 2020, 2023, 2024 IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,12 +21,10 @@
import java.util.Map;
import java.util.Map.Entry;
+import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.common.config.ConfigDef.Importance;
-import org.apache.kafka.common.config.ConfigDef.Range;
-import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkConnector;
import org.slf4j.Logger;
@@ -35,148 +33,29 @@
public class MQSinkConnector extends SinkConnector {
private static final Logger log = LoggerFactory.getLogger(MQSinkConnector.class);
- public static final String CONFIG_GROUP_MQ = "mq";
-
- public static final String CONFIG_NAME_MQ_QUEUE_MANAGER = "mq.queue.manager";
- public static final String CONFIG_DOCUMENTATION_MQ_QUEUE_MANAGER = "The name of the MQ queue manager.";
- public static final String CONFIG_DISPLAY_MQ_QUEUE_MANAGER = "Queue manager";
-
- public static final String CONFIG_NAME_MQ_CONNECTION_MODE = "mq.connection.mode";
- public static final String CONFIG_DOCUMENTATION_MQ_CONNECTION_MODE = "The connection mode - bindings or client.";
- public static final String CONFIG_DISPLAY_MQ_CONNECTION_MODE = "Connection mode";
- public static final String CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT = "client";
- public static final String CONFIG_VALUE_MQ_CONNECTION_MODE_BINDINGS = "bindings";
-
- public static final String CONFIG_NAME_MQ_CONNECTION_NAME_LIST = "mq.connection.name.list";
- public static final String CONFIG_DOCUMENTATION_MQ_CONNNECTION_NAME_LIST = "A list of one or more host(port) entries for connecting to the queue manager. Entries are separated with a comma.";
- public static final String CONFIG_DISPLAY_MQ_CONNECTION_NAME_LIST = "List of connection names for queue manager";
-
- public static final String CONFIG_NAME_MQ_CHANNEL_NAME = "mq.channel.name";
- public static final String CONFIG_DOCUMENTATION_MQ_CHANNEL_NAME = "The name of the server-connection channel.";
- public static final String CONFIG_DISPLAY_MQ_CHANNEL_NAME = "Channel name";
-
- public static final String CONFIG_NAME_MQ_QUEUE = "mq.queue";
- public static final String CONFIG_DOCUMENTATION_MQ_QUEUE = "The name of the target MQ queue.";
- public static final String CONFIG_DISPLAY_MQ_QUEUE = "Target queue";
-
- public static final String CONFIG_NAME_MQ_USER_NAME = "mq.user.name";
- public static final String CONFIG_DOCUMENTATION_MQ_USER_NAME = "The user name for authenticating with the queue manager.";
- public static final String CONFIG_DISPLAY_MQ_USER_NAME = "User name";
-
- public static final String CONFIG_NAME_MQ_PASSWORD = "mq.password";
- public static final String CONFIG_DOCUMENTATION_MQ_PASSWORD = "The password for authenticating with the queue manager.";
- public static final String CONFIG_DISPLAY_MQ_PASSWORD = "Password";
-
- public static final String CONFIG_NAME_MQ_CCDT_URL = "mq.ccdt.url";
- public static final String CONFIG_DOCUMENTATION_MQ_CCDT_URL = "The CCDT URL to use to establish a connection to the queue manager.";
- public static final String CONFIG_DISPLAY_MQ_CCDT_URL = "CCDT URL";
-
- public static final String CONFIG_NAME_MQ_MESSAGE_BUILDER = "mq.message.builder";
- public static final String CONFIG_DOCUMENTATION_MQ_MESSAGE_BUILDER = "The class used to build the MQ messages.";
- public static final String CONFIG_DISPLAY_MQ_MESSAGE_BUILDER = "Message builder";
-
- public static final String CONFIG_NAME_MQ_MESSAGE_BODY_JMS = "mq.message.body.jms";
- public static final String CONFIG_DOCUMENTATION_MQ_MESSAGE_BODY_JMS = "Whether to generate the message body as a JMS message type.";
- public static final String CONFIG_DISPLAY_MQ_MESSAGE_BODY_JMS = "Message body as JMS";
-
- public static final String CONFIG_NAME_MQ_TIME_TO_LIVE = "mq.time.to.live";
- public static final String CONFIG_DOCUMENTATION_MQ_TIME_TO_LIVE = "Time-to-live in milliseconds for messages sent to MQ.";
- public static final String CONFIG_DISPLAY_MQ_TIME_TO_LIVE = "Message time-to-live (ms)";
-
- public static final String CONFIG_NAME_MQ_PERSISTENT = "mq.persistent";
- public static final String CONFIG_DOCUMENTATION_MQ_PERSISTENT = "Send persistent or non-persistent messages to MQ.";
- public static final String CONFIG_DISPLAY_MQ_PERSISTENT = "Send persistent messages";
-
- public static final String CONFIG_NAME_MQ_SSL_CIPHER_SUITE = "mq.ssl.cipher.suite";
- public static final String CONFIG_DOCUMENTATION_MQ_SSL_CIPHER_SUITE = "The name of the cipher suite for the TLS (SSL) connection.";
- public static final String CONFIG_DISPLAY_MQ_SSL_CIPHER_SUITE = "SSL cipher suite";
-
- public static final String CONFIG_NAME_MQ_SSL_PEER_NAME = "mq.ssl.peer.name";
- public static final String CONFIG_DOCUMENTATION_MQ_SSL_PEER_NAME = "The distinguished name pattern of the TLS (SSL) peer.";
- public static final String CONFIG_DISPLAY_MQ_SSL_PEER_NAME = "SSL peer name";
-
- public static final String CONFIG_NAME_MQ_SSL_KEYSTORE_LOCATION = "mq.ssl.keystore.location";
- public static final String CONFIG_DOCUMENTATION_MQ_SSL_KEYSTORE_LOCATION = "The path to the JKS keystore to use for the TLS (SSL) connection.";
- public static final String CONFIG_DISPLAY_MQ_SSL_KEYSTORE_LOCATION = "SSL keystore location";
-
- public static final String CONFIG_NAME_MQ_SSL_KEYSTORE_PASSWORD = "mq.ssl.keystore.password";
- public static final String CONFIG_DOCUMENTATION_MQ_SSL_KEYSTORE_PASSWORD = "The password of the JKS keystore to use for the TLS (SSL) connection.";
- public static final String CONFIG_DISPLAY_MQ_SSL_KEYSTORE_PASSWORD = "SSL keystore password";
-
- public static final String CONFIG_NAME_MQ_SSL_TRUSTSTORE_LOCATION = "mq.ssl.truststore.location";
- public static final String CONFIG_DOCUMENTATION_MQ_SSL_TRUSTSTORE_LOCATION = "The path to the JKS truststore to use for the TLS (SSL) connection.";
- public static final String CONFIG_DISPLAY_MQ_SSL_TRUSTSTORE_LOCATION = "SSL truststore location";
-
- public static final String CONFIG_NAME_MQ_SSL_TRUSTSTORE_PASSWORD = "mq.ssl.truststore.password";
- public static final String CONFIG_DOCUMENTATION_MQ_SSL_TRUSTSTORE_PASSWORD = "The password of the JKS truststore to use for the TLS (SSL) connection.";
- public static final String CONFIG_DISPLAY_MQ_SSL_TRUSTSTORE_PASSWORD = "SSL truststore password";
-
- public static final String CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS = "mq.ssl.use.ibm.cipher.mappings";
- public static final String CONFIG_DOCUMENTATION_MQ_SSL_USE_IBM_CIPHER_MAPPINGS = "Whether to set system property to control use of IBM cipher mappings.";
- public static final String CONFIG_DISPLAY_MQ_SSL_USE_IBM_CIPHER_MAPPINGS = "Use IBM cipher mappings";
-
- public static final String CONFIG_NAME_MQ_MESSAGE_BUILDER_KEY_HEADER = "mq.message.builder.key.header";
- public static final String CONFIG_DOCUMENTATION_MQ_MESSAGE_BUILDER_KEY_HEADER = "The JMS message header to set from the Kafka record key.";
- public static final String CONFIG_DISPLAY_MQ_MESSAGE_BUILDER_KEY_HEADER = "Record builder key header";
- public static final String CONFIG_VALUE_MQ_MESSAGE_BUILDER_KEY_HEADER_JMSCORRELATIONID = "JMSCorrelationID";
-
- public static final String CONFIG_NAME_MQ_MESSAGE_BUILDER_VALUE_CONVERTER = "mq.message.builder.value.converter";
- public static final String CONFIG_DOCUMENTATION_MQ_MESSAGE_BUILDER_VALUE_CONVERTER = "Prefix for configuring message builder's value converter.";
- public static final String CONFIG_DISPLAY_MQ_MESSAGE_BUILDER_VALUE_CONVERTER = "Message builder's value converter";
-
- public static final String CONFIG_NAME_MQ_MESSAGE_BUILDER_TOPIC_PROPERTY = "mq.message.builder.topic.property";
- public static final String CONFIG_DOCUMENTATION_MQ_MESSAGE_BUILDER_TOPIC_PROPERTY = "The JMS message property to set from the Kafka topic.";
- public static final String CONFIG_DISPLAY_MQ_MESSAGE_BUILDER_TOPIC_PROPERTY = "Kafka topic message property";
-
- public static final String CONFIG_NAME_MQ_MESSAGE_BUILDER_PARTITION_PROPERTY = "mq.message.builder.partition.property";
- public static final String CONFIG_DOCUMENTATION_MQ_MESSAGE_BUILDER_PARTITION_PROPERTY = "The JMS message property to set from the Kafka partition.";
- public static final String CONFIG_DISPLAY_MQ_MESSAGE_BUILDER_PARTITION_PROPERTY = "Kafka partition message property";
-
- public static final String CONFIG_NAME_MQ_MESSAGE_BUILDER_OFFSET_PROPERTY = "mq.message.builder.offset.property";
- public static final String CONFIG_DOCUMENTATION_MQ_MESSAGE_BUILDER_OFFSET_PROPERTY = "The JMS message property to set from the Kafka offset.";
- public static final String CONFIG_DISPLAY_MQ_MESSAGE_BUILDER_OFFSET_PROPERTY = "Kafka offset message property";
-
- public static final String CONFIG_NAME_MQ_REPLY_QUEUE = "mq.reply.queue";
- public static final String CONFIG_DOCUMENTATION_MQ_REPLY_QUEUE = "The name of the reply-to queue, as a queue name or URI.";
- public static final String CONFIG_DISPLAY_MQ_REPLY_QUEUE = "Reply-to queue";
-
- public static final String CONFIG_NAME_MQ_USER_AUTHENTICATION_MQCSP = "mq.user.authentication.mqcsp";
- public static final String CONFIG_DOCUMENTATION_MQ_USER_AUTHENTICATION_MQCSP = "Whether to use MQ connection security parameters (MQCSP).";
- public static final String CONFIG_DISPLAY_MQ_USER_AUTHENTICATION_MQCSP = "User authentication using MQCSP";
-
- public static final String CONFIG_NAME_KAFKA_HEADERS_COPY_TO_JMS_PROPERTIES = "mq.kafka.headers.copy.to.jms.properties";
- public static final String CONFIG_DOCUMENTATION_KAFKA_HEADERS_COPY_TO_JMS_PROPERTIES = "Whether to copy Kafka headers to JMS message properties.";
- public static final String CONFIG_DISPLAY_KAFKA_HEADERS_COPY_TO_JMS_PROPERTIES = "Copy Kafka headers to JMS message properties";
-
- public static final String CONFIG_NAME_MQ_RETRY_BACKOFF_MS = "mq.retry.backoff.ms";
- public static final String CONFIG_DOCUMENTATION_MQ_RETRY_BACKOFF_MS = "Time to wait, in milliseconds, before retrying after retriable exceptions";
- public static final String CONFIG_DISPLAY_MQ_RETRY_BACKOFF_MS = "Retry backoff (ms)";
-
-
- public static String version = "1.5.2";
+ public static String version = "2.2.0";
private Map configProps;
- /**
- * Get the version of this connector.
+ /** Get the version of this connector.
*
- * @return the version, formatted as a String
- */
- @Override public String version() {
+ * @return the version, formatted as a String */
+ @Override
+ public String version() {
return version;
}
- /**
- * Start this Connector. This method will only be called on a clean Connector, i.e. it has
- * either just been instantiated and initialized or {@link #stop()} has been invoked.
+ /** Start this Connector. This method will only be called on a clean Connector, i.e. it has either just been
+ * instantiated and initialized or {@link #stop()} has been invoked.
*
- * @param props configuration settings
- */
- @Override public void start(final Map props) {
+ * @param props
+ * configuration settings */
+ @Override
+ public void start(final Map props) {
log.trace("[{}] Entry {}.start, props={}", Thread.currentThread().getId(), this.getClass().getName(), props);
configProps = props;
- for (final Entry entry: props.entrySet()) {
+ for (final Entry entry : props.entrySet()) {
final String value;
if (entry.getKey().toLowerCase(Locale.ENGLISH).contains("password")) {
value = "[hidden]";
@@ -189,162 +68,64 @@ public class MQSinkConnector extends SinkConnector {
log.trace("[{}] Exit {}.start", Thread.currentThread().getId(), this.getClass().getName());
}
- /**
- * Returns the Task implementation for this Connector.
- */
- @Override public Class extends Task> taskClass() {
+ /** Returns the Task implementation for this Connector. */
+ @Override
+ public Class extends Task> taskClass() {
return MQSinkTask.class;
- }
+ }
- /**
- * Returns a set of configurations for Tasks based on the current configuration,
- * producing at most count configurations.
+ /** Returns a set of configurations for Tasks based on the current configuration, producing at most count
+ * configurations.
*
- * @param maxTasks maximum number of configurations to generate
- * @return configurations for Tasks
- */
- @Override public List