diff --git a/components/extensions/cdmf-transport-adapters/input/io.entgra.device.mgt.plugins.input.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/input/adapter/mqtt/MQTTEventAdapterFactory.java b/components/extensions/cdmf-transport-adapters/input/io.entgra.device.mgt.plugins.input.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/input/adapter/mqtt/MQTTEventAdapterFactory.java index 0987956c2..027125fff 100644 --- a/components/extensions/cdmf-transport-adapters/input/io.entgra.device.mgt.plugins.input.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/input/adapter/mqtt/MQTTEventAdapterFactory.java +++ b/components/extensions/cdmf-transport-adapters/input/io.entgra.device.mgt.plugins.input.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/input/adapter/mqtt/MQTTEventAdapterFactory.java @@ -124,6 +124,15 @@ public class MQTTEventAdapterFactory extends InputEventAdapterFactory { clientId.setHint(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_CLIENTID_HINT)); propertyList.add(clientId); + // set qos + Property qosProperty = new Property(MQTTEventAdapterConstants.ADAPTER_MESSAGE_QOS); + qosProperty.setDisplayName(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_MESSAGE_QOS)); + qosProperty.setRequired(false); + qosProperty.setOptions(new String[]{"0", "1", "2"}); + qosProperty.setDefaultValue("0"); + + propertyList.add(qosProperty); + return propertyList; } diff --git a/components/extensions/cdmf-transport-adapters/input/io.entgra.device.mgt.plugins.input.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/input/adapter/mqtt/util/MQTTAdapterListener.java b/components/extensions/cdmf-transport-adapters/input/io.entgra.device.mgt.plugins.input.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/input/adapter/mqtt/util/MQTTAdapterListener.java index 10293bc8c..9f12a5544 100644 --- a/components/extensions/cdmf-transport-adapters/input/io.entgra.device.mgt.plugins.input.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/input/adapter/mqtt/util/MQTTAdapterListener.java +++ b/components/extensions/cdmf-transport-adapters/input/io.entgra.device.mgt.plugins.input.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/input/adapter/mqtt/util/MQTTAdapterListener.java @@ -56,6 +56,7 @@ public class MQTTAdapterListener implements MqttCallback, Runnable { private MQTTBrokerConnectionConfiguration mqttBrokerConnectionConfiguration; private String topic; + private int qos; private String topicStructure; private String tenantDomain; private volatile boolean connectionSucceeded = false; @@ -79,6 +80,7 @@ public class MQTTAdapterListener implements MqttCallback, Runnable { int keepAlive = mqttBrokerConnectionConfiguration.getKeepAlive(); this.topicStructure = new String(topic); this.topic = PropertyUtils.replacePlaceholders(topic); + this.qos = mqttBrokerConnectionConfiguration.getQos(); this.eventAdapterListener = inputEventAdapterListener; this.tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain(); @@ -158,7 +160,7 @@ public class MQTTAdapterListener implements MqttCallback, Runnable { return false; } try { - mqttClient.subscribe(topic); + mqttClient.subscribe(topic, qos); log.info("mqtt receiver subscribed to topic: " + topic); } catch (MqttException e) { log.error("Failed to subscribe to topic: " + topic + ", Retrying....."); diff --git a/components/extensions/cdmf-transport-adapters/input/io.entgra.device.mgt.plugins.input.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/input/adapter/mqtt/util/MQTTBrokerConnectionConfiguration.java b/components/extensions/cdmf-transport-adapters/input/io.entgra.device.mgt.plugins.input.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/input/adapter/mqtt/util/MQTTBrokerConnectionConfiguration.java index 06e00e42b..472eec277 100644 --- a/components/extensions/cdmf-transport-adapters/input/io.entgra.device.mgt.plugins.input.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/input/adapter/mqtt/util/MQTTBrokerConnectionConfiguration.java +++ b/components/extensions/cdmf-transport-adapters/input/io.entgra.device.mgt.plugins.input.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/input/adapter/mqtt/util/MQTTBrokerConnectionConfiguration.java @@ -32,6 +32,7 @@ public class MQTTBrokerConnectionConfiguration { private String brokerScopes = null; private boolean cleanSession = true; private int keepAlive; + private int qos; private String brokerUrl; private String dcrUrl; private String contentValidatorType; @@ -83,6 +84,14 @@ public class MQTTBrokerConnectionConfiguration { return adapterName; } + public int getQos() { + return qos; + } + + public void setQos(int qos) { + this.qos = qos; + } + public MQTTBrokerConnectionConfiguration(InputEventAdapterConfiguration eventAdapterConfiguration, Map globalProperties) throws InputEventAdapterException { @@ -131,6 +140,15 @@ public class MQTTBrokerConnectionConfiguration { } else { keepAlive = MQTTEventAdapterConstants.ADAPTER_CONF_DEFAULT_KEEP_ALIVE; } + + String qosVal = globalProperties.get(MQTTEventAdapterConstants.ADAPTER_MESSAGE_QOS); + if (qosVal != null && !qosVal.isEmpty()) { + this.qos = Integer.parseInt(qosVal); + } else { + qosVal = eventAdapterConfiguration.getProperties().get(MQTTEventAdapterConstants.ADAPTER_MESSAGE_QOS); + this.qos = Integer.parseInt(qosVal); + } + this.contentTransformerType = eventAdapterConfiguration.getProperties() .get(MQTTEventAdapterConstants.ADAPTER_CONF_CONTENT_TRANSFORMER_TYPE); } diff --git a/components/extensions/cdmf-transport-adapters/input/io.entgra.device.mgt.plugins.input.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/input/adapter/mqtt/util/MQTTEventAdapterConstants.java b/components/extensions/cdmf-transport-adapters/input/io.entgra.device.mgt.plugins.input.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/input/adapter/mqtt/util/MQTTEventAdapterConstants.java index fc047cffd..7d26fc1b5 100644 --- a/components/extensions/cdmf-transport-adapters/input/io.entgra.device.mgt.plugins.input.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/input/adapter/mqtt/util/MQTTEventAdapterConstants.java +++ b/components/extensions/cdmf-transport-adapters/input/io.entgra.device.mgt.plugins.input.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/input/adapter/mqtt/util/MQTTEventAdapterConstants.java @@ -47,6 +47,7 @@ public class MQTTEventAdapterConstants { public static final String ADAPTER_CONF_CLEAN_SESSION_HINT = "cleanSession.hint"; public static final String ADAPTER_CONF_KEEP_ALIVE = "keepAlive"; public static final int ADAPTER_CONF_DEFAULT_KEEP_ALIVE = 20000; + public static final String ADAPTER_MESSAGE_QOS = "qos"; public static final int INITIAL_RECONNECTION_DURATION = 4000; public static final int RECONNECTION_PROGRESS_FACTOR = 2; diff --git a/components/extensions/cdmf-transport-adapters/input/io.entgra.device.mgt.plugins.input.adapter.mqtt/src/main/resources/io/entgra/device/mgt/plugins/input/adapter/mqtt/i18n/Resources.properties b/components/extensions/cdmf-transport-adapters/input/io.entgra.device.mgt.plugins.input.adapter.mqtt/src/main/resources/io/entgra/device/mgt/plugins/input/adapter/mqtt/i18n/Resources.properties index aee8d9a4c..9552188b8 100644 --- a/components/extensions/cdmf-transport-adapters/input/io.entgra.device.mgt.plugins.input.adapter.mqtt/src/main/resources/io/entgra/device/mgt/plugins/input/adapter/mqtt/i18n/Resources.properties +++ b/components/extensions/cdmf-transport-adapters/input/io.entgra.device.mgt.plugins.input.adapter.mqtt/src/main/resources/io/entgra/device/mgt/plugins/input/adapter/mqtt/i18n/Resources.properties @@ -18,6 +18,7 @@ topic=Topic topic.hint=Topic subscribed +qos=Quality of Service clientId=Client Id clientId.hint=client identifier is used by the server to identify a client when it reconnects, It used for durable subscriptions or reliable delivery of messages is required. url=Broker Url (Not required), If it is not provided then it will connect to the default broker. diff --git a/components/extensions/cdmf-transport-adapters/output/io.entgra.device.mgt.plugins.output.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/output/adapter/mqtt/MQTTEventAdapterFactory.java b/components/extensions/cdmf-transport-adapters/output/io.entgra.device.mgt.plugins.output.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/output/adapter/mqtt/MQTTEventAdapterFactory.java index 6e63d8169..14f1ed1d9 100644 --- a/components/extensions/cdmf-transport-adapters/output/io.entgra.device.mgt.plugins.output.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/output/adapter/mqtt/MQTTEventAdapterFactory.java +++ b/components/extensions/cdmf-transport-adapters/output/io.entgra.device.mgt.plugins.output.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/output/adapter/mqtt/MQTTEventAdapterFactory.java @@ -92,7 +92,7 @@ public class MQTTEventAdapterFactory extends OutputEventAdapterFactory { qos.setDisplayName(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_MESSAGE_QOS)); qos.setRequired(false); qos.setOptions(new String[]{"0", "1", "2"}); - qos.setDefaultValue("2"); + qos.setDefaultValue("0"); // set topic Property topicProperty = new Property(MQTTEventAdapterConstants.ADAPTER_MESSAGE_TOPIC);