|
|
|
@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory;
|
|
|
|
|
import org.apache.http.HttpResponse;
|
|
|
|
|
import org.apache.http.client.HttpClient;
|
|
|
|
|
import org.apache.http.client.methods.HttpPost;
|
|
|
|
|
import org.apache.http.conn.HttpHostConnectException;
|
|
|
|
|
import org.apache.http.entity.ContentType;
|
|
|
|
|
import org.apache.http.entity.StringEntity;
|
|
|
|
|
import org.apache.http.message.BasicHeader;
|
|
|
|
@ -131,7 +132,7 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void startListener() throws MqttException {
|
|
|
|
|
public boolean startListener() throws MqttException {
|
|
|
|
|
if (this.mqttBrokerConnectionConfiguration.getUsername() != null &&
|
|
|
|
|
this.mqttBrokerConnectionConfiguration.getDcrUrl() != null) {
|
|
|
|
|
String username = this.mqttBrokerConnectionConfiguration.getUsername();
|
|
|
|
@ -151,12 +152,12 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
|
|
|
|
|
registrationProfile.setTokenScope(MQTTEventAdapterConstants.TOKEN_SCOPE);
|
|
|
|
|
if (!mqttBrokerConnectionConfiguration.isGlobalCredentailSet()) {
|
|
|
|
|
registrationProfile.setClientName(MQTTEventAdapterConstants.APPLICATION_NAME_PREFIX
|
|
|
|
|
+ mqttBrokerConnectionConfiguration.getAdapterName() +
|
|
|
|
|
"_" + tenantDomain);
|
|
|
|
|
+ mqttBrokerConnectionConfiguration.getAdapterName() +
|
|
|
|
|
"_" + tenantDomain);
|
|
|
|
|
registrationProfile.setIsSaasApp(false);
|
|
|
|
|
} else {
|
|
|
|
|
registrationProfile.setClientName(MQTTEventAdapterConstants.APPLICATION_NAME_PREFIX
|
|
|
|
|
+ mqttBrokerConnectionConfiguration.getAdapterName());
|
|
|
|
|
+ mqttBrokerConnectionConfiguration.getAdapterName());
|
|
|
|
|
registrationProfile.setIsSaasApp(true);
|
|
|
|
|
}
|
|
|
|
|
String jsonString = registrationProfile.toJSON();
|
|
|
|
@ -164,8 +165,8 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
|
|
|
|
|
postMethod.setEntity(requestEntity);
|
|
|
|
|
String basicAuth = getBase64Encode(username, password);
|
|
|
|
|
postMethod.setHeader(new BasicHeader(MQTTEventAdapterConstants.AUTHORIZATION_HEADER_NAME,
|
|
|
|
|
MQTTEventAdapterConstants.AUTHORIZATION_HEADER_VALUE_PREFIX +
|
|
|
|
|
basicAuth));
|
|
|
|
|
MQTTEventAdapterConstants.AUTHORIZATION_HEADER_VALUE_PREFIX +
|
|
|
|
|
basicAuth));
|
|
|
|
|
HttpResponse httpResponse = httpClient.execute(postMethod);
|
|
|
|
|
if (httpResponse != null) {
|
|
|
|
|
String response = MQTTUtil.getResponseString(httpResponse);
|
|
|
|
@ -182,21 +183,29 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
|
|
|
|
|
log.error(msg, e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} catch (HttpHostConnectException e) {
|
|
|
|
|
log.error("Keymanager is unreachable, Waiting....");
|
|
|
|
|
return false;
|
|
|
|
|
} catch (MalformedURLException e) {
|
|
|
|
|
log.error("Invalid dcrUrl : " + dcrUrlString);
|
|
|
|
|
return false;
|
|
|
|
|
} catch (JWTClientException | UserStoreException e) {
|
|
|
|
|
log.error("Failed to create an oauth token with jwt grant type.", e);
|
|
|
|
|
return false;
|
|
|
|
|
} catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException | IOException e) {
|
|
|
|
|
log.error("Failed to create a http connection.", e);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
try {
|
|
|
|
|
mqttClient.connect(connectionOptions);
|
|
|
|
|
} catch (MqttException e) {
|
|
|
|
|
log.error("Broker is unreachable, Waiting.....");
|
|
|
|
|
log.warn("Broker is unreachable, Waiting.....");
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
mqttClient.subscribe(topic);
|
|
|
|
|
return true;
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -271,10 +280,14 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
|
|
|
|
|
while (!connectionSucceeded) {
|
|
|
|
|
try {
|
|
|
|
|
connectionDuration = connectionDuration * MQTTEventAdapterConstants.RECONNECTION_PROGRESS_FACTOR;
|
|
|
|
|
if (connectionDuration > MQTTEventAdapterConstants.MAXIMUM_RECONNECTION_DURATION) {
|
|
|
|
|
connectionDuration = MQTTEventAdapterConstants.MAXIMUM_RECONNECTION_DURATION;
|
|
|
|
|
}
|
|
|
|
|
Thread.sleep(connectionDuration);
|
|
|
|
|
startListener();
|
|
|
|
|
connectionSucceeded = true;
|
|
|
|
|
log.info("MQTT Connection successful");
|
|
|
|
|
if (startListener()) {
|
|
|
|
|
connectionSucceeded = true;
|
|
|
|
|
log.info("MQTT Connection successful");
|
|
|
|
|
}
|
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
|
log.error("Interruption occurred while waiting for reconnection", e);
|
|
|
|
|
} catch (MqttException e) {
|
|
|
|
|