Skip to content

Commit

Permalink
feat: support for writing to MQMD (ibm-messaging#36)
Browse files Browse the repository at this point in the history
Contributes to: event-integration/eventstreams-planning#12432

Signed-off-by: Joel Hanson <joel.hanson2@ibm.com>
  • Loading branch information
Joel Hanson authored and GitHub Enterprise committed Mar 12, 2024
1 parent 64a724b commit b5282cf
Show file tree
Hide file tree
Showing 4 changed files with 333 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
/**
* Copyright 2024 IBM Corporation
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ibm.eventstreams.connect.mqsink;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.mock;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;

import javax.jms.JMSException;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.junit.Test;

import com.ibm.eventstreams.connect.mqsink.util.MessageDescriptorBuilder;
import com.ibm.mq.MQException;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.constants.MQConstants;

public class MQMDTests extends MQSinkTaskAuthIT {

private Map<String, String> createDefaultConnectorProperties() {
final Map<String, String> connectorProps = new HashMap<>();
connectorProps.put("mq.queue.manager", AbstractJMSContextIT.QMGR_NAME);
connectorProps.put("mq.connection.mode", AbstractJMSContextIT.CONNECTION_MODE);
connectorProps.put("mq.connection.name.list", AbstractJMSContextIT.HOST_NAME + "("
+ MQ_CONTAINER.getMappedPort(AbstractJMSContextIT.TCP_MQ_EXPOSED_PORT).toString() + ")");
connectorProps.put("mq.channel.name", AbstractJMSContextIT.CHANNEL_NAME);
connectorProps.put("mq.queue", AbstractJMSContextIT.DEFAULT_SINK_QUEUE_NAME);
connectorProps.put("mq.user.authentication.mqcsp", String.valueOf(USER_AUTHENTICATION_MQCSP));
connectorProps.put("mq.user.name", AbstractJMSContextIT.APP_USERNAME);
connectorProps.put("mq.password", AbstractJMSContextIT.APP_PASSWORD);
connectorProps.put("mq.message.mqmd.write", "true");
connectorProps.put("mq.message.mqmd.context", "ALL");
return connectorProps;
}

private MQQueueManager getQmgr() throws MQException {
Hashtable<Object, Object> props = new Hashtable<>();
props.put(MQConstants.HOST_NAME_PROPERTY, "localhost");
props.put(MQConstants.PORT_PROPERTY, MQ_CONTAINER.getMappedPort(AbstractJMSContextIT.TCP_MQ_EXPOSED_PORT));
props.put(MQConstants.CHANNEL_PROPERTY, AbstractJMSContextIT.CHANNEL_NAME);
props.put(MQConstants.USER_ID_PROPERTY, AbstractJMSContextIT.APP_USERNAME);
props.put(MQConstants.PASSWORD_PROPERTY, AbstractJMSContextIT.APP_PASSWORD);

return new MQQueueManager(AbstractJMSContextIT.QMGR_NAME, props);
}

private MQMessage[] mqGet(String queue) throws MQException, IOException {
MQQueue q = getQmgr().accessQueue(queue, MQConstants.MQOO_INPUT_SHARED | MQConstants.MQOO_INQUIRE);

List<MQMessage> messages = new ArrayList<>();
while (q.getCurrentDepth() > 0) {
MQMessage msg = new MQMessage();
q.get(msg);
messages.add(msg);
}
q.close();

return messages.toArray(new MQMessage[messages.size()]);
}

@Test
public void verifyAuthExceptionIfNoAuthContextPermission()
throws JMSException, MQException, IOException, InterruptedException {
MQ_CONTAINER.execInContainer("setmqaut",
"-m", AbstractJMSContextIT.QMGR_NAME,
"-n", AbstractJMSContextIT.DEFAULT_SINK_QUEUE_NAME,
"-p", AbstractJMSContextIT.APP_USERNAME,
"-t", "queue",
"-setall", "+get", "+browse", "+put", "+inq"); // The setall grant is removed if present

MQ_CONTAINER.execInContainer("setmqaut",
"-m", AbstractJMSContextIT.QMGR_NAME,
"-p", AbstractJMSContextIT.APP_USERNAME,
"-t", "qmgr",
"-setall"); // The setall grant is removed if present

final MQSinkTask newConnectTask = new MQSinkTask();
newConnectTask.initialize(mock(SinkTaskContext.class));

// configure a sink task for string messages
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
connectorConfigProps.put("mq.message.builder",
AbstractJMSContextIT.DEFAULT_MESSAGE_BUILDER);
connectorConfigProps.put("mq.message.body.jms", "true");

// start the task so that it connects to MQ
newConnectTask.start(connectorConfigProps);

// create a test message
final List<SinkRecord> records = new ArrayList<>();
records.add(new SinkRecord(AbstractJMSContextIT.TOPIC, AbstractJMSContextIT.PARTITION,
Schema.STRING_SCHEMA, "key0",
Schema.STRING_SCHEMA, "value0",
0L));

// An MQException is thrown with code MQRC_NOT_AUTHORIZED (reason code 2035) and
// compcode 2. This exception occurs when the MQ authorization for the queue and
// queue manager lacks the necessary permissions. Since MQRC_NOT_AUTHORIZED is
// considered a retriable exception, the system retries it, leading to
// RetriableException
assertThrows(RetriableException.class, () -> {
newConnectTask.put(records);
});
}

@Test
public void verifyMQMDWriteDisabled()
throws JMSException, MQException, IOException, InterruptedException {
final MQSinkTask newConnectTask = new MQSinkTask();

// configure a sink task for string messages
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
connectorConfigProps.put("mq.message.builder",
MessageDescriptorBuilder.class.getCanonicalName());

connectorConfigProps.put("mq.message.body.jms", "true");
connectorConfigProps.put("mq.message.mqmd.write", "false");

// start the task so that it connects to MQ
newConnectTask.start(connectorConfigProps);

// create some test message
final List<SinkRecord> records = new ArrayList<>();
records.add(new SinkRecord(AbstractJMSContextIT.TOPIC, AbstractJMSContextIT.PARTITION,
Schema.STRING_SCHEMA, "key0",
Schema.STRING_SCHEMA, "value0",
0L));
newConnectTask.put(records);

// flush the message
final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
final TopicPartition topic = new TopicPartition(AbstractJMSContextIT.TOPIC, AbstractJMSContextIT.PARTITION);
final OffsetAndMetadata offset = new OffsetAndMetadata(0L);
offsets.put(topic, offset);
newConnectTask.flush(offsets);

// stop the task
newConnectTask.stop();

// verify that the message was submitted to MQ without descriptors
final MQMessage[] messagesInMQ = mqGet(AbstractJMSContextIT.DEFAULT_SINK_QUEUE_NAME);
assertEquals(1, messagesInMQ.length);
assertEquals("value0", messagesInMQ[0].readLine());
assertNotEquals("ThisIsMyId", new String(messagesInMQ[0].messageId).trim());
assertNotEquals("ThisIsMyApplicationData", messagesInMQ[0].applicationIdData.trim());
assertNotEquals("ThisIsMyPutApplicationName", messagesInMQ[0].putApplicationName.trim());
assertEquals("MYQMGR", messagesInMQ[0].replyToQueueManagerName.trim());
}

@Test
public void verifyMQMDWriteEnabled()
throws JMSException, MQException, IOException, InterruptedException {

// The following code block sets authorization permissions for a specified user
// (AbstractJMSContextIT.APP_USERNAME) on a particular queue
// (AbstractJMSContextIT.DEFAULT_SINK_QUEUE_NAME) within an IBM MQ environment,
// granting permissions to set all properties (`+setall`) about messages on the
// queue.
MQ_CONTAINER.execInContainer("setmqaut",
"-m", AbstractJMSContextIT.QMGR_NAME,
"-n", AbstractJMSContextIT.DEFAULT_SINK_QUEUE_NAME,
"-p", AbstractJMSContextIT.APP_USERNAME,
"-t", "queue",
"+setall", "+get", "+browse", "+put", "+inq");

// This code line grants authorization permissions for a specified user
// (`AbstractJMSContextIT.APP_USERNAME`) on a specific queue manager
// (`AbstractJMSContextIT.QMGR_NAME`) within an IBM MQ environment, allowing the
// user to set all properties (`+setall`) of the queue manager.
MQ_CONTAINER.execInContainer("setmqaut",
"-m", AbstractJMSContextIT.QMGR_NAME,
"-p", AbstractJMSContextIT.APP_USERNAME,
"-t", "qmgr",
"+setall");

// How to debug mq to list the authorization:
// For queue: dspmqaut -m MYQMGR -t queue -n DEV.QUEUE.1 -p app
// For queue Manager: dspmqaut -m MYQMGR -t qmgr -p app

final MQSinkTask newConnectTask = new MQSinkTask();

// configure a sink task for string messages
final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
connectorConfigProps.put("mq.message.builder",
MessageDescriptorBuilder.class.getCanonicalName());
connectorConfigProps.put("mq.message.body.jms", "true");

// start the task so that it connects to MQ
newConnectTask.start(connectorConfigProps);

// create a test message
final List<SinkRecord> records = new ArrayList<>();
records.add(new SinkRecord(AbstractJMSContextIT.TOPIC, AbstractJMSContextIT.PARTITION,
Schema.STRING_SCHEMA, "key0",
Schema.STRING_SCHEMA, "value0",
0L));
newConnectTask.put(records);

// flush the message
final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
final TopicPartition topic = new TopicPartition(AbstractJMSContextIT.TOPIC, AbstractJMSContextIT.PARTITION);
final OffsetAndMetadata offset = new OffsetAndMetadata(0L);
offsets.put(topic, offset);
newConnectTask.flush(offsets);

// stop the task
newConnectTask.stop();

// verify that the message was submitted to MQ with the correct descriptors
final MQMessage[] messagesInMQ = mqGet(AbstractJMSContextIT.DEFAULT_SINK_QUEUE_NAME);
assertEquals(1, messagesInMQ.length);
assertEquals("value0", messagesInMQ[0].readLine());
assertEquals("ThisIsMyId", new String(messagesInMQ[0].messageId).trim());
assertEquals("ThisIsMyApplicationData", messagesInMQ[0].applicationIdData.trim());
assertEquals("ThisIsMyPutApplicationName", messagesInMQ[0].putApplicationName.trim());
assertEquals("MYQMGR", messagesInMQ[0].replyToQueueManagerName.trim());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Copyright 2024 IBM Corporation
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ibm.eventstreams.connect.mqsink.util;

import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.Message;

import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;

import com.ibm.eventstreams.connect.mqsink.builders.DefaultMessageBuilder;
import com.ibm.msg.client.jms.JmsConstants;

public class MessageDescriptorBuilder extends DefaultMessageBuilder {

@Override
public Message getJMSMessage(JMSContext jmsCtxt, SinkRecord record) {

Message message = super.getJMSMessage(jmsCtxt, record);

// add MQMD values
// JMS_IBM_MQMD_MsgId - byte[]
// JMS_IBM_MQMD_ApplIdentityData - string
// JMS_IBM_MQMD_PutApplName - string
// https://www.ibm.com/docs/en/ibm-mq/9.3?topic=application-jms-message-object-properties
try {
message.setObjectProperty(JmsConstants.JMS_IBM_MQMD_MSGID, "ThisIsMyId".getBytes());
message.setStringProperty(JmsConstants.JMS_IBM_MQMD_APPLIDENTITYDATA, "ThisIsMyApplicationData");
message.setStringProperty(JmsConstants.JMS_IBM_MQMD_PUTAPPLNAME, "ThisIsMyPutApplicationName");

} catch (JMSException e) {
throw new ConnectException("Failed to write property", e);
}

return message;
}
}
16 changes: 16 additions & 0 deletions src/main/java/com/ibm/eventstreams/connect/mqsink/JMSWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.net.MalformedURLException;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;

Expand Down Expand Up @@ -107,6 +108,21 @@ public void configure(final AbstractConfig config) throws ConnectException {
try {
mqConnFactory = mqConnectionHelper.createMQConnFactory();
queue = configureQueue(mqConnectionHelper.getQueueName(), mqConnectionHelper.isMessageBodyJms());
final Boolean mqmdWriteEnabled = config.getBoolean(MQSinkConfig.CONFIG_NAME_MQ_MQMD_WRITE_ENABLED);
queue.setBooleanProperty(WMQConstants.WMQ_MQMD_WRITE_ENABLED, mqmdWriteEnabled);

if (mqmdWriteEnabled) {
String mqmdMessageContext = config.getString(MQSinkConfig.CONFIG_NAME_MQ_MQMD_MESSAGE_CONTEXT);
if (mqmdMessageContext != null) {
mqmdMessageContext = mqmdMessageContext.toLowerCase(Locale.ENGLISH);
}
if ("identity".equals(mqmdMessageContext)) {
queue.setIntProperty(WMQConstants.WMQ_MQMD_MESSAGE_CONTEXT,
WMQConstants.WMQ_MDCTX_SET_IDENTITY_CONTEXT);
} else if ("all".equals(mqmdMessageContext)) {
queue.setIntProperty(WMQConstants.WMQ_MQMD_MESSAGE_CONTEXT, WMQConstants.WMQ_MDCTX_SET_ALL_CONTEXT);
}
}
if (isExactlyOnceMode) {
stateQueue = configureQueue(mqConnectionHelper.getStateQueueName(), true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,16 @@ public class MQSinkConfig {
public static final String CONFIG_DOCUMENTATION_MQ_RETRY_BACKOFF_MS = "Time to wait, in milliseconds, before retrying after retriable exceptions";
public static final String CONFIG_DISPLAY_MQ_RETRY_BACKOFF_MS = "Retry backoff (ms)";

// https://www.ibm.com/docs/en/ibm-mq/9.3?topic=amffmcja-reading-writing-message-descriptor-from-mq-classes-jms-application
public static final String CONFIG_NAME_MQ_MQMD_WRITE_ENABLED = "mq.message.mqmd.write";
public static final String CONFIG_DISPLAY_MQ_MQMD_WRITE_ENABLED = "Enable MQMD Message Writing";
public static final String CONFIG_DOCUMENTATION_MQ_MQMD_WRITE_ENABLED = "This configuration option determines whether the MQMD structure will be written along with the message data. Enabling this option allows control information to accompany the application data during message transmission between sending and receiving applications. Disabling this option will exclude the MQMD structure from the message payload.";

// https://www.ibm.com/docs/en/ibm-mq/9.3?topic=application-jms-message-object-properties
public static final String CONFIG_NAME_MQ_MQMD_MESSAGE_CONTEXT = "mq.message.mqmd.context";
public static final String CONFIG_DISPLAY_MQ_MQMD_MESSAGE_CONTEXT = "MQMD Message Context";
public static final String CONFIG_DOCUMENTATION_MQ_MQMD_MESSAGE_CONTEXT = "This configuration option specifies the context in which MQMD properties are applied. Certain properties require this context to be set appropriately for them to take effect. Valid options for WMQ_MQMD_MESSAGE_CONTEXT are IDENTITY for WMQ_MDCTX_SET_IDENTITY_CONTEXT or ALL for WMQ_MDCTX_SET_ALL_CONTEXT.";

private static final Validator ANY_VALUE_VALID = null;

public static ConfigDef config() {
Expand Down Expand Up @@ -220,6 +230,15 @@ public static ConfigDef config() {

config.define(CONFIG_NAME_MQ_EXACTLY_ONCE_STATE_QUEUE, Type.STRING, null, ANY_VALUE_VALID, Importance.LOW, CONFIG_DOCUMENTATION_MQ_EXACTLY_ONCE_STATE_QUEUE, CONFIG_GROUP_MQ, 29, Width.LONG, CONFIG_DISPLAY_MQ_EXACTLY_ONCE_STATE_QUEUE);

config.define(CONFIG_NAME_MQ_MQMD_WRITE_ENABLED, Type.BOOLEAN, false, Importance.LOW,
CONFIG_DOCUMENTATION_MQ_MQMD_WRITE_ENABLED, CONFIG_GROUP_MQ, 30, Width.LONG,
CONFIG_DISPLAY_MQ_MQMD_WRITE_ENABLED);

config.define(CONFIG_NAME_MQ_MQMD_MESSAGE_CONTEXT, Type.STRING, null,
ConfigDef.ValidString.in(null, "identity", "IDENTITY", "all", "ALL"),
Importance.LOW,
CONFIG_DOCUMENTATION_MQ_MQMD_MESSAGE_CONTEXT, CONFIG_GROUP_MQ, 31, Width.LONG,
CONFIG_DISPLAY_MQ_MQMD_MESSAGE_CONTEXT);
return config;
}

Expand Down

0 comments on commit b5282cf

Please sign in to comment.