diff --git a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTAdapterListener.java b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTAdapterListener.java index 12f362380..483f65f70 100644 --- a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTAdapterListener.java +++ b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTAdapterListener.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpPost; +import org.apache.http.conn.HttpHostConnectException; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; import org.apache.http.message.BasicHeader; @@ -131,7 +132,7 @@ public class MQTTAdapterListener implements MqttCallback, Runnable { } } - public void startListener() throws MqttException { + public boolean startListener() throws MqttException { if (this.mqttBrokerConnectionConfiguration.getUsername() != null && this.mqttBrokerConnectionConfiguration.getDcrUrl() != null) { String username = this.mqttBrokerConnectionConfiguration.getUsername(); @@ -151,12 +152,12 @@ public class MQTTAdapterListener implements MqttCallback, Runnable { registrationProfile.setTokenScope(MQTTEventAdapterConstants.TOKEN_SCOPE); if (!mqttBrokerConnectionConfiguration.isGlobalCredentailSet()) { registrationProfile.setClientName(MQTTEventAdapterConstants.APPLICATION_NAME_PREFIX - + mqttBrokerConnectionConfiguration.getAdapterName() + - "_" + tenantDomain); + + mqttBrokerConnectionConfiguration.getAdapterName() + + "_" + tenantDomain); registrationProfile.setIsSaasApp(false); } else { registrationProfile.setClientName(MQTTEventAdapterConstants.APPLICATION_NAME_PREFIX - + mqttBrokerConnectionConfiguration.getAdapterName()); + + mqttBrokerConnectionConfiguration.getAdapterName()); registrationProfile.setIsSaasApp(true); } String jsonString = registrationProfile.toJSON(); @@ -164,8 +165,8 @@ public class MQTTAdapterListener implements MqttCallback, Runnable { postMethod.setEntity(requestEntity); String basicAuth = getBase64Encode(username, password); postMethod.setHeader(new BasicHeader(MQTTEventAdapterConstants.AUTHORIZATION_HEADER_NAME, - MQTTEventAdapterConstants.AUTHORIZATION_HEADER_VALUE_PREFIX + - basicAuth)); + MQTTEventAdapterConstants.AUTHORIZATION_HEADER_VALUE_PREFIX + + basicAuth)); HttpResponse httpResponse = httpClient.execute(postMethod); if (httpResponse != null) { String response = MQTTUtil.getResponseString(httpResponse); @@ -182,21 +183,29 @@ public class MQTTAdapterListener implements MqttCallback, Runnable { log.error(msg, e); } } + } catch (HttpHostConnectException e) { + log.error("Keymanager is unreachable, Waiting...."); + return false; } catch (MalformedURLException e) { log.error("Invalid dcrUrl : " + dcrUrlString); + return false; } catch (JWTClientException | UserStoreException e) { log.error("Failed to create an oauth token with jwt grant type.", e); + return false; } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException | IOException e) { log.error("Failed to create a http connection.", e); + return false; } } } try { mqttClient.connect(connectionOptions); } catch (MqttException e) { - log.error("Broker is unreachable, Waiting....."); + log.warn("Broker is unreachable, Waiting....."); + return false; } mqttClient.subscribe(topic); + return true; } @@ -271,10 +280,14 @@ public class MQTTAdapterListener implements MqttCallback, Runnable { while (!connectionSucceeded) { try { connectionDuration = connectionDuration * MQTTEventAdapterConstants.RECONNECTION_PROGRESS_FACTOR; + if (connectionDuration > MQTTEventAdapterConstants.MAXIMUM_RECONNECTION_DURATION) { + connectionDuration = MQTTEventAdapterConstants.MAXIMUM_RECONNECTION_DURATION; + } Thread.sleep(connectionDuration); - startListener(); - connectionSucceeded = true; - log.info("MQTT Connection successful"); + if (startListener()) { + connectionSucceeded = true; + log.info("MQTT Connection successful"); + } } catch (InterruptedException e) { log.error("Interruption occurred while waiting for reconnection", e); } catch (MqttException e) { diff --git a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTEventAdapterConstants.java b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTEventAdapterConstants.java index dc0e45413..7d52fb53c 100644 --- a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTEventAdapterConstants.java +++ b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTEventAdapterConstants.java @@ -50,6 +50,7 @@ public class MQTTEventAdapterConstants { public static final int INITIAL_RECONNECTION_DURATION = 4000; public static final int RECONNECTION_PROGRESS_FACTOR = 2; + public static final int MAXIMUM_RECONNECTION_DURATION = 60000; public static final String EMPTY_STRING = ""; public static final String GRANT_TYPE_PARAM_NAME = "grant_type";