How to create your first Kafka MQ source connector

Last updated: 27 March 2025

This posts describes the steps to create your first Kafka MQ source connector.

Prerequisites

  • The Kafka connect runtime has been deployed and started.

  • The queue manager has been created and configured for a Kafka connector.

  • The Kafka topic exists.

Create the Kafka MQ source connector yaml file

Create a yaml file named kc-mq.yaml as follows.

Refer to the comments in the yaml file for additional details on configuring this file.

apiversion: eventstreams.ibm.com/vibeta2
kind: KafkaConnector
metadata:
  # Set the Kafka Connector name.
  # This name appears in the Kafka Connector tab of the Event Streams Operator.
  name: my-kafka-connector-name-01
  labels:
    # Set the the same value as "eventstreams.ibm.com/cluster" from the Kafka Connect yaml file.
    eventstreams.ibm.com/cluster: my-kafka-connect-01
spec:
  class: com.ibm.eventstreams.connect.mqsource.MQSourceConnector
  # For 'exactly once' messaging, to avoid duplicates, the value must be set to 1.
  #tasksMax: 1
  # Optionally add the autoRestarts setting to handle recovery situations.
  #autoRestart:
  #  enabled: true
  #  maxRestarts: 10
  config:
    # For exactly-once messaging, set the state queue name.
    # Define this queue as PERSISTENT with DEFSOPT(EXCL).
    # mq.exactly.once.state.queue: MY_STATE_QUEUE_NAME
    mq.queue.manager: MY_QMGR
    mq.connection.name.list: 10.1.1.1(1414)
    # For exactly-once messaging, set the HBINT(30) on the MQ channel name.
    mq.channel.name: MY_SVRCONN_CHANNEL
    mq.queue: MY_QUEUE_TO_GET_MESSAGES_FROM
    mq.user.name: ""
    mq.password: ""
    topic: MY-KAFKA-TOPIC-NAME
    mq.connection.mode: client
    mq.record.builder: com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder
    # In this example we set the converter to ByteArrayConverter to ensre the message contents published to the topic is correct.
    # key.converter: org.apache.kafka.connect.storage.StringConverter
    # value.converter: org.apache.kafka.connect.storage.StringConverter
    key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
    value.converter: org. apache.kafka. connect.converters.ByteArrayConverter
    # You can control the state of the connecter with the below values.
  #state: running
  #state: paused
  #state: stopped

Deploy the Kafka MQ source connector

The below process uses the OC CLI.

Log in to OpenShift

Log into OpenShift using the OC CLI.

oc login -u YOUR_USER_ID --server=https://OPENSHIFT_URL:OPENSHIFT_PORT
oc login -u sean --server=https://api.crc.testing:6443

Select the correct project

List the projects.

oc projects
/home/sean>oc projects
You have access to the following projects and can switch between them with ' project <projectname>':

  * my-kafka-dev
    my-kafka-qc

Using project "my-kafka-dev" on server "https://api.crc.testing:6443".

If you are not on the correct project (the one with the ‘*’), select it.

oc project my-kafka-qc
/home/sean>oc project my-kafka-qc
Now using project "my-kafka-qc" on server "https://api.crc.testing:6443".

Deploy the Kafka MQ source connector

Issue the follopwing command to deploy the connector.

oc apply -f kc-mq.yaml
/home/sean>oc apply -f kc-mq.yaml
kafkaconnector.eventstreams.ibm.com/my-kafka-connector-name-01 created

To check the deployment status, you can log into the OpenShift console and view the pod logs.

Alternatively, you can use the OC CLI.

Get the pod name.

oc get pods
/home/sean>oc get pods
NAME                                              READY   STATUS    RESTARTS   AGE
my-kafka-connect-name-01-connect-0                1/1     Running   0          31s

You can tail the logs as follows.

Check for any errors.

oc logs -f my-kafka-connect-name-01-connect-0

Verification

Assuming no errors in the pod logs, use IBM MQ Explorer to confirm the input handles on the MQ queue is 1.

References

To configure a Kafka MQ source connector, refer to the following URL:

The following lists all the properties you can configure.