Es Mq Source Connector Exactly Once
Kafka MQ Source Connector Exactly-Once Delivery
Last updated: 25 March 2025
This post comtains the details to configure the Kafka MQ source connector exactly-once delivery.
Full details can be found on the IBM page exactly-once message delivery semantics in IBM MQ source connector.
More details can be found on exactly-once messaging in Github.
Prerequisites
-
Kafka Connect version 3.3.0 or later.
-
Can only be used in distributed mode - that is with docker or kubernetes.
Refer to the IBM running the MQ source connector prerequisites page for full details.
Settings
Tasks max in the Kafka connector
For exactly-once messaging, tasksMax must be set to 1 in the Kafka MQ source connector yaml file.
tasksMax: 1
apiversion: eventstreams.ibm.com/vibeta2
kind: KafkaConnector
metadata:
name: your-connector-name
labels:
:
spec:
class: com.ibm.eventstreams.connect.mqsource.MQSourceConnector
# For 'exactly-once' messaging, to avoid duplicates, the tasksMax must be set to 1.
tasksMax: 1
:
config:
:
Replicas in the Kafka connect runtime
While I haven’t found this in the documentation, considering tasksMax in the Kafka MQ source connector must be set to 1, by extension, the replicas value in the Kafka connect runtime should be set to 1.
replicas: 1
apiversion: eventstreams.ibm.com/v1beta2
kind: KafkaConnect
metadata:
:
spec:
authentication:
:
bootstrapServers: your-bootstrap-url
config:
:
image: your-image
replicas: 1
resources :
:
Enable exactly-once in the Kafka MQ source connect runtime
To enable exactly-once messaging, add the following to the Kafka MQ source connect yaml file.
exactly.once.source.support: enabled
apiversion: eventstreams.ibm.com/v1beta2
kind: KafkaConnect
metadata:
:
spec:
authentication:
:
bootstrapServers: your-bootstrap-url
config:
# If you are using exactly-once messaging, set this to enabled.
exactly.once.source.support: enabled
More details can be found in enabling exactly-once prerequisites.
Configure exactly-once in the Kafka MQ source connector
Add the state queue name in the Kafka MQ source connector yaml file.
Also add the reconnect option. For the possible values, refer to the kafka connect mq source page.
mq.exactly.once.state.queue: specify-your-state-queue-name
mq.client.reconnect.options: QMGR
apiversion: eventstreams.ibm.com/vibeta2
kind: KafkaConnector
metadata:
:
spec:
:
config:
# For exactly-once messaging, set the state queue name.
# Define the queued as PERSISTENT with DEFSOPT(EXCL).
mq.exactly.once.state.queue: specify-your-state-queue-name
mq.client.reconnect.options: QMGR
mq.queue.manager: your-queue-manager-name
Refer to all connector configuration settings for a full list of the MQ source connector properties,.
Transaction boundary
The transaction.boundary property must be set to poll. This is the default.
transaction.boundary: poll (default)
Refer to the exactly-once delivery page for details.
Also refer to the Kafka desription of transaction boundary.
Full yaml files
Full yaml files have been provided below to help configure exactly-once messaging.
Kafka MQ source connect runtime
apiversion: eventstreams.ibm.com/v1beta2
kind: KafkaConnect
metadata:
annotations:
eventstreams.ibm.com/use-connector-resources: "true"
labels:
# Change the cluster name to a unique name in the Kafka environment
# https://ibm.github.io/event-automation/es/connecting/setting-up-connectors/#configure-the-connector
eventstreams.ibm.com/cluster: my-kafka-connect-01
# Set the Kafka Connect runtime name.
# This name appears in the Kafka Connect tab of the Event Streams Operator.
# Ensure to use this as the prefix in the "productChargedContainers" field later in this file.
name: my-kafka-connect-name-01
# The namespace where you will run your connector.
namespace: my-ocp-namespace-name
spec:
authentication:
# As the connector runs in a namespace in OpenShift, use the keys for authorization rather than SCRAM.
# The user key details will be provided by your Kafka admin when they create your userid that will access Kafka.
# Ensure the admin sends you the secrets in a yaml file and that the file contains the user key details (5 name-value key pairs) and not the SCRAM details (1 name-value key pair).
# Create an OCP secret with these details.
certificateAndKey:
certificate: user.crt
key: user.key
# Set the below name to the same name you set when creating the secret containing the user key fields.
secretName: my-user-keys-secret
type: tls
# Set the Kafka bootstrap URL.
# Ensure the URL is the "internal" bootstrap URL. The URL can bne found in the "cluster connection" section of the Event Streams console.
# The "internal" URL is used for namesopace-to-namespace communication.
# The "external" URL is used when you are acceesing the Kafka cluster from outside of OpenShift.
bootstrapServers: my=kafka-cluster-internal-bootstrap-url:9093
config:
# Uncomment the below if you are using exactly-once messaging.
#exactly.once.source.support: enabled
# If your Kafka cluster is set to use 3 brokers, set the factor values to 3.
# The topic names must be unique within the Kafka environment. If the names are already
# used, you'll get an error.
# Set prefix of the 3 topics to the "eventstreams.ibm.com/cluster" value.
# Ensure the Kafka admin granted your user access to created the below topics. If this is not
# done, you'll see an authorization error in the Kafka Connect pod logs.
config.storage.replication.factor: 3
config.storage.topic: my-kafka-connect-01-config
group.id: my-kafka-connect-01
offset.storage.replication.factor: 3
offset.storage.topic: my-kafka-connect-01-offset-storage
status.storage.replication.factor: 3
status.storage. topic: my-kafka-connect-01-status-storage
# Set the image name to the one you want to use.
# IN the below, my image name is "kc-mq-source-2.3.0" with tag "1.0".
image: my-image-repo/kc-mq-source-2.3.0:1.0
# The advice I received for the Kafka Connect runtime is to set replicas to 1.
# This is particularly imortant when using 'exactly-once' messaging where taskaMax must be set to 1 to avoid duplicated messages. If replicas is set to 2, it would havwe the same effect as setting tasksMax to 2.
# Ensure the limits and request sizes are <= the Quota values defined in OpenShift for your namespace. If these values are larger the pod won't start.
replicas: 1
resources :
limits:
cpu: 1
memory: 2Gi
requests:
cpu: 1
memory: 2Gi
template:
pod :
metadata:
annotations :
cloudpakId: c8b82d189e7545f0892db9ef2731b90d
cloudpakName: IBM Cloud Pak for Integration
eventstreams.production.type: CloudPakForIntegrationNonProduction
# Change the below to be the "name" from earlier in this file with the suffic "-connect".
productChargedContainers: my-kafka-connect-name-01-connect
productCloudpakRatio: "2:1"
productID: 2a79e49111f44ec3acd89608e56138f5
productMetric: VIRTUAL_PROCESSOR_CORE
productName: IBM Event Streams for Non Production
productVersion: 11.3.1
tls:
# To use TLS on the connection to the Kafka Connect cluster the CA cert of the cluster needs to be saved ni your namespace as a secret. The below name is the secret name.
trustedCertificates:
- certificate: ca.crt
secretName: my-kafka-cluster-ca-cert```
Kafka MQ source connector
apiversion: eventstreams.ibm.com/vibeta2
kind: KafkaConnector
metadata:
# Set the Kafka Connector name.
# This name appears in the Kafka Connector tab of the Event Streams Operator.
name: my-kafka-connector-name-01
labels:
# Set the the same value as "eventstreams.ibm.com/cluster" from the Kafka Connect yaml file.
eventstreams.ibm.com/cluster: my-kafka-connect-01
spec:
class: com.ibm.eventstreams.connect.mqsource.MQSourceConnector
# For 'exactly once' messaging, to avoid duplicates, the value must be set to 1.
#tasksMax: 1
# Optionally add the autoRestarts setting to handle recovery situations.
#autoRestart:
# enabled: true
# maxRestarts: 10
config:
# For exactly-once messaging, set the state queue name.
# Define this queue as PERSISTENT with DEFSOPT(EXCL).
# mq.exactly.once.state.queue: MY_STATE_QUEUE_NAME
# mq.client.reconnect.options: QMGR
mq.queue.manager: MY_QMGR
mq.connection.name.list: 10.1.1.1(1414)
# For exactly-once messaging, set the HBINT(30) on the MQ channel name.
mq.channel.name: MY_SVRCONN_CHANNEL
mq.queue: MY_QUEUE_TO_GET_MESSAGES_FROM
mq.user.name: ""
mq.password: ""
topic: MY-KAFKA-TOPIC-NAME
mq.connection.mode: client
mq.record.builder: com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder
# In this example we set the converter to ByteArrayConverter to ensre the message contents published to the topic is correct.
# key.converter: org.apache.kafka.connect.storage.StringConverter
# value.converter: org.apache.kafka.connect.storage.StringConverter
key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
value.converter: org. apache.kafka. connect.converters.ByteArrayConverter
# You can control the state of the connecter with the below values.
#state: running
#state: paused
#state: stopped
MQ config
Set the HBINT on the MQ chnnel to 30 seconds.
It is recommended to set the DEFSOPT option on the state queue to EXCL.
Ensure the source queue (containing the messages that will be sent to Kafka) is set as persistent.
Ensure the state queue is set as persistent.
Refer to creating a state queue in IBM MQ for the IBM MQ details.
MQ message
Ensure the expiry time on the messges are set to unlimited.
Ensure all messages put to the source queue and state queue are persistent. If an app puts to the source queue, ensure the application doesn’t override the queue default by setting the messages as non-persistent.
Consuming apps
Ensure all consumers of the topic set the Kafka isolation level to read_committed. This ensures they are only consuming transactionally committed messages.
Startup
Ensure the exatly once ‘state’ queue is empty before starting the connector with exactly-once enabled.