Merge pull request #269 from ayyoob/transport

fixed exponential backup logic
revert-dabc3590
sumedharubasinghe 9 years ago
commit b8b0eda0bb

@ -69,7 +69,6 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
private InputEventAdapterListener eventAdapterListener = null; private InputEventAdapterListener eventAdapterListener = null;
public MQTTAdapterListener(MQTTBrokerConnectionConfiguration mqttBrokerConnectionConfiguration, public MQTTAdapterListener(MQTTBrokerConnectionConfiguration mqttBrokerConnectionConfiguration,
String topic, String mqttClientId, String topic, String mqttClientId,
InputEventAdapterListener inputEventAdapterListener, int tenantId) { InputEventAdapterListener inputEventAdapterListener, int tenantId) {
@ -173,22 +172,26 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
StringEntity requestEntity = new StringEntity(jsonString, ContentType.APPLICATION_JSON); StringEntity requestEntity = new StringEntity(jsonString, ContentType.APPLICATION_JSON);
postMethod.setEntity(requestEntity); postMethod.setEntity(requestEntity);
HttpResponse httpResponse = httpClient.execute(postMethod); HttpResponse httpResponse = httpClient.execute(postMethod);
String response = MQTTUtil.getResponseString(httpResponse); if (httpResponse != null) {
try { String response = MQTTUtil.getResponseString(httpResponse);
JSONParser jsonParser = new JSONParser(); try {
JSONObject jsonPayload = (JSONObject) jsonParser.parse(response); if (response != null) {
String clientId = (String) jsonPayload.get(MQTTEventAdapterConstants.CLIENT_ID); JSONParser jsonParser = new JSONParser();
String clientSecret = (String) jsonPayload.get(MQTTEventAdapterConstants.CLIENT_SECRET); JSONObject jsonPayload = (JSONObject) jsonParser.parse(response);
JWTClientManagerService jwtClientManagerService = MQTTUtil.getJWTClientManagerService(); String clientId = (String) jsonPayload.get(MQTTEventAdapterConstants.CLIENT_ID);
AccessTokenInfo accessTokenInfo = jwtClientManagerService.getJWTClient().getAccessToken( String clientSecret = (String) jsonPayload.get(MQTTEventAdapterConstants.CLIENT_SECRET);
clientId, clientSecret, username, scopes); JWTClientManagerService jwtClientManagerService = MQTTUtil.getJWTClientManagerService();
connectionOptions.setUserName(accessTokenInfo.getAccessToken()); AccessTokenInfo accessTokenInfo = jwtClientManagerService.getJWTClient().getAccessToken(
} catch (ParseException e) { clientId, clientSecret, username, scopes);
String msg = "error occurred while parsing client credential payload"; connectionOptions.setUserName(accessTokenInfo.getAccessToken());
log.error(msg, e); }
} catch (JWTClientException e) { } catch (ParseException e) {
String msg = "error occurred while parsing the response from JWT Client"; String msg = "error occurred while parsing client credential payload";
log.error(msg, e); log.error(msg, e);
} catch (JWTClientException e) {
String msg = "error occurred while parsing the response from JWT Client";
log.error(msg, e);
}
} }
} catch (MalformedURLException e) { } catch (MalformedURLException e) {
log.error("Invalid dcrUrl : " + dcrUrlString); log.error("Invalid dcrUrl : " + dcrUrlString);
@ -263,10 +266,10 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
@Override @Override
public void run() { public void run() {
int connectionDuration = MQTTEventAdapterConstants.initialReconnectDuration; int connectionDuration = MQTTEventAdapterConstants.INITIAL_RECONNECTION_DURATION;
while (!connectionSucceeded) { while (!connectionSucceeded) {
try { try {
connectionDuration = connectionDuration * MQTTEventAdapterConstants.reconnectionProgressionFactor; connectionDuration = connectionDuration * MQTTEventAdapterConstants.RECONNECTION_PROGRESS_FACTOR;
Thread.sleep(connectionDuration); Thread.sleep(connectionDuration);
startListener(); startListener();
connectionSucceeded = true; connectionSucceeded = true;

@ -47,8 +47,8 @@ public class MQTTEventAdapterConstants {
public static final String ADAPTER_CONF_KEEP_ALIVE = "keepAlive"; public static final String ADAPTER_CONF_KEEP_ALIVE = "keepAlive";
public static final int ADAPTER_CONF_DEFAULT_KEEP_ALIVE = 60000; public static final int ADAPTER_CONF_DEFAULT_KEEP_ALIVE = 60000;
public static int initialReconnectDuration = 2000; public static final int INITIAL_RECONNECTION_DURATION = 4000;
public static final int reconnectionProgressionFactor = 2; public static final int RECONNECTION_PROGRESS_FACTOR = 2;
public static final String EMPTY_STRING = ""; public static final String EMPTY_STRING = "";
public static final String GRANT_TYPE = "urn:ietf:params:oauth:grant-type:jwt-bearer refresh_token"; public static final String GRANT_TYPE = "urn:ietf:params:oauth:grant-type:jwt-bearer refresh_token";

@ -203,9 +203,9 @@ public class XMPPAdapterListener implements Runnable {
@Override @Override
public void run() { public void run() {
while (!connectionSucceeded) { while (!connectionSucceeded) {
int connectionDuration = XMPPEventAdapterConstants.initialReconnectDuration; int connectionDuration = XMPPEventAdapterConstants.INITIAL_RECONNECTION_DURATION;
try { try {
connectionDuration = connectionDuration * XMPPEventAdapterConstants.reconnectionProgressionFactor; connectionDuration = connectionDuration * XMPPEventAdapterConstants.RECONNECTION_PROGRESS_FACTOR;
Thread.sleep(connectionDuration); Thread.sleep(connectionDuration);
startListener(); startListener();
connectionSucceeded = true; connectionSucceeded = true;

@ -49,8 +49,8 @@ public class XMPPEventAdapterConstants {
public static final int DEFAULT_XMPP_PORT = 5222; public static final int DEFAULT_XMPP_PORT = 5222;
public static final int DEFAULT_TIMEOUT_INTERVAL = 5000; public static final int DEFAULT_TIMEOUT_INTERVAL = 5000;
public static int initialReconnectDuration = 2000; public static int INITIAL_RECONNECTION_DURATION = 4000;
public static final int reconnectionProgressionFactor = 2; public static final int RECONNECTION_PROGRESS_FACTOR = 2;
public static final String DEFAULT = "default"; public static final String DEFAULT = "default";

Loading…
Cancel
Save