tcdlpds 6 months ago
commit fa9b38289a

@ -124,6 +124,15 @@ public class MQTTEventAdapterFactory extends InputEventAdapterFactory {
clientId.setHint(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_CLIENTID_HINT)); clientId.setHint(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_CLIENTID_HINT));
propertyList.add(clientId); propertyList.add(clientId);
// set qos
Property qosProperty = new Property(MQTTEventAdapterConstants.ADAPTER_MESSAGE_QOS);
qosProperty.setDisplayName(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_MESSAGE_QOS));
qosProperty.setRequired(false);
qosProperty.setOptions(new String[]{"0", "1", "2"});
qosProperty.setDefaultValue("0");
propertyList.add(qosProperty);
return propertyList; return propertyList;
} }

@ -58,6 +58,7 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
private MQTTBrokerConnectionConfiguration mqttBrokerConnectionConfiguration; private MQTTBrokerConnectionConfiguration mqttBrokerConnectionConfiguration;
private String topic; private String topic;
private int qos;
private String topicStructure; private String topicStructure;
private String tenantDomain; private String tenantDomain;
private volatile boolean connectionSucceeded = false; private volatile boolean connectionSucceeded = false;
@ -81,6 +82,7 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
int keepAlive = mqttBrokerConnectionConfiguration.getKeepAlive(); int keepAlive = mqttBrokerConnectionConfiguration.getKeepAlive();
this.topicStructure = new String(topic); this.topicStructure = new String(topic);
this.topic = PropertyUtils.replacePlaceholders(topic); this.topic = PropertyUtils.replacePlaceholders(topic);
this.qos = mqttBrokerConnectionConfiguration.getQos();
this.eventAdapterListener = inputEventAdapterListener; this.eventAdapterListener = inputEventAdapterListener;
this.tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain(); this.tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
@ -162,7 +164,7 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
return false; return false;
} }
try { try {
mqttClient.subscribe(topic); mqttClient.subscribe(topic, qos);
log.info("mqtt receiver subscribed to topic: " + topic); log.info("mqtt receiver subscribed to topic: " + topic);
} catch (MqttException e) { } catch (MqttException e) {
log.error("Failed to subscribe to topic: " + topic + ", Retrying....."); log.error("Failed to subscribe to topic: " + topic + ", Retrying.....");

@ -32,6 +32,7 @@ public class MQTTBrokerConnectionConfiguration {
private String brokerScopes = null; private String brokerScopes = null;
private boolean cleanSession = true; private boolean cleanSession = true;
private int keepAlive; private int keepAlive;
private int qos;
private String brokerUrl; private String brokerUrl;
private String dcrUrl; private String dcrUrl;
private String contentValidatorType; private String contentValidatorType;
@ -83,6 +84,14 @@ public class MQTTBrokerConnectionConfiguration {
return adapterName; return adapterName;
} }
public int getQos() {
return qos;
}
public void setQos(int qos) {
this.qos = qos;
}
public MQTTBrokerConnectionConfiguration(InputEventAdapterConfiguration eventAdapterConfiguration, public MQTTBrokerConnectionConfiguration(InputEventAdapterConfiguration eventAdapterConfiguration,
Map<String, String> globalProperties) throws InputEventAdapterException { Map<String, String> globalProperties) throws InputEventAdapterException {
@ -131,6 +140,15 @@ public class MQTTBrokerConnectionConfiguration {
} else { } else {
keepAlive = MQTTEventAdapterConstants.ADAPTER_CONF_DEFAULT_KEEP_ALIVE; keepAlive = MQTTEventAdapterConstants.ADAPTER_CONF_DEFAULT_KEEP_ALIVE;
} }
String qosVal = globalProperties.get(MQTTEventAdapterConstants.ADAPTER_MESSAGE_QOS);
if (qosVal != null && !qosVal.isEmpty()) {
this.qos = Integer.parseInt(qosVal);
} else {
qosVal = eventAdapterConfiguration.getProperties().get(MQTTEventAdapterConstants.ADAPTER_MESSAGE_QOS);
this.qos = Integer.parseInt(qosVal);
}
this.contentTransformerType = eventAdapterConfiguration.getProperties() this.contentTransformerType = eventAdapterConfiguration.getProperties()
.get(MQTTEventAdapterConstants.ADAPTER_CONF_CONTENT_TRANSFORMER_TYPE); .get(MQTTEventAdapterConstants.ADAPTER_CONF_CONTENT_TRANSFORMER_TYPE);
} }

@ -47,6 +47,7 @@ public class MQTTEventAdapterConstants {
public static final String ADAPTER_CONF_CLEAN_SESSION_HINT = "cleanSession.hint"; public static final String ADAPTER_CONF_CLEAN_SESSION_HINT = "cleanSession.hint";
public static final String ADAPTER_CONF_KEEP_ALIVE = "keepAlive"; public static final String ADAPTER_CONF_KEEP_ALIVE = "keepAlive";
public static final int ADAPTER_CONF_DEFAULT_KEEP_ALIVE = 20000; public static final int ADAPTER_CONF_DEFAULT_KEEP_ALIVE = 20000;
public static final String ADAPTER_MESSAGE_QOS = "qos";
public static final int INITIAL_RECONNECTION_DURATION = 4000; public static final int INITIAL_RECONNECTION_DURATION = 4000;
public static final int RECONNECTION_PROGRESS_FACTOR = 2; public static final int RECONNECTION_PROGRESS_FACTOR = 2;

@ -18,6 +18,7 @@
topic=Topic topic=Topic
topic.hint=Topic subscribed topic.hint=Topic subscribed
qos=Quality of Service
clientId=Client Id clientId=Client Id
clientId.hint=client identifier is used by the server to identify a client when it reconnects, It used for durable subscriptions or reliable delivery of messages is required. clientId.hint=client identifier is used by the server to identify a client when it reconnects, It used for durable subscriptions or reliable delivery of messages is required.
url=Broker Url (Not required), If it is not provided then it will connect to the default broker. url=Broker Url (Not required), If it is not provided then it will connect to the default broker.

@ -92,7 +92,7 @@ public class MQTTEventAdapterFactory extends OutputEventAdapterFactory {
qos.setDisplayName(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_MESSAGE_QOS)); qos.setDisplayName(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_MESSAGE_QOS));
qos.setRequired(false); qos.setRequired(false);
qos.setOptions(new String[]{"0", "1", "2"}); qos.setOptions(new String[]{"0", "1", "2"});
qos.setDefaultValue("2"); qos.setDefaultValue("0");
// set topic // set topic
Property topicProperty = new Property(MQTTEventAdapterConstants.ADAPTER_MESSAGE_TOPIC); Property topicProperty = new Property(MQTTEventAdapterConstants.ADAPTER_MESSAGE_TOPIC);

Loading…
Cancel
Save