Merge pull request #88 from Shabirmean/IoTS-1.0.0-M1

Refactoring XMPP and MQTT Connectors for VirtualFireAlarm
Ruwan 9 years ago
commit c82ad8734e

@ -43,7 +43,7 @@ import java.nio.file.Files;
import java.nio.file.Paths; import java.nio.file.Paths;
/** /**
* This class reads the sonar reading and injects values * This class reads the humidity reading and injects values
* to the siddhiEngine for processing on a routine basis * to the siddhiEngine for processing on a routine basis
* also if the siddhiquery is updated the class takes * also if the siddhiquery is updated the class takes
* care of re-initializing same. * care of re-initializing same.
@ -104,8 +104,8 @@ public class SidhdhiQuery implements Runnable {
//Sending events to Siddhi //Sending events to Siddhi
try { try {
int sonarReading = AgentManager.getInstance().getTemperature(); int humidityReading = AgentManager.getInstance().getTemperature();
inputHandler.send(new Object[]{"FIRE_1", sonarReading}); inputHandler.send(new Object[]{"FIRE_1", humidityReading});
Thread.sleep(3000); Thread.sleep(3000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
@ -142,14 +142,14 @@ public class SidhdhiQuery implements Runnable {
/** /**
* Read sonar data from API URL * Read humidity data from API URL
* *
* @param sonarAPIUrl * @param humidityAPIUrl
* @return * @return
*/ */
private String readSonarData(String sonarAPIUrl) { private String readHumidityData(String humidityAPIUrl) {
HttpClient client = new DefaultHttpClient(); HttpClient client = new DefaultHttpClient();
HttpGet request = new HttpGet(sonarAPIUrl); HttpGet request = new HttpGet(humidityAPIUrl);
String responseStr = null; String responseStr = null;
try { try {
HttpResponse response = client.execute(request); HttpResponse response = client.execute(request);
@ -161,7 +161,7 @@ public class SidhdhiQuery implements Runnable {
} catch (IOException e) { } catch (IOException e) {
//log.error("Exception encountered while trying to make get request."); //log.error("Exception encountered while trying to make get request.");
log.error("Error while reading sonar reading from file!"); log.error("Error while reading humidity reading from file!");
return responseStr; return responseStr;
} }
return responseStr; return responseStr;

@ -103,6 +103,7 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
String deviceID = agentManager.getAgentConfigs().getDeviceId(); String deviceID = agentManager.getAgentConfigs().getDeviceId();
String receivedMessage; String receivedMessage;
String replyMessage; String replyMessage;
String securePayLoad;
try { try {
receivedMessage = AgentUtilOperations.extractMessageFromPayload(message.toString()); receivedMessage = AgentUtilOperations.extractMessageFromPayload(message.toString());
@ -116,56 +117,53 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
String[] controlSignal = receivedMessage.split(":"); String[] controlSignal = receivedMessage.split(":");
// message- "<SIGNAL_TYPE>:<SIGNAL_MODE>" format.(ex: "BULB:ON", "TEMPERATURE", "HUMIDITY") // message- "<SIGNAL_TYPE>:<SIGNAL_MODE>" format.(ex: "BULB:ON", "TEMPERATURE", "HUMIDITY")
switch (controlSignal[0].toUpperCase()) { try {
case AgentConstants.BULB_CONTROL: switch (controlSignal[0].toUpperCase()) {
boolean stateToSwitch = controlSignal[1].equals(AgentConstants.CONTROL_ON); case AgentConstants.BULB_CONTROL:
boolean stateToSwitch = controlSignal[1].equals(AgentConstants.CONTROL_ON);
agentManager.changeAlarmStatus(stateToSwitch);
log.info(AgentConstants.LOG_APPENDER + "Bulb was switched to state: '" + controlSignal[1] + "'");
break;
agentManager.changeAlarmStatus(stateToSwitch); case AgentConstants.TEMPERATURE_CONTROL:
log.info(AgentConstants.LOG_APPENDER + "Bulb was switched to state: '" + controlSignal[1] + "'"); int currentTemperature = agentManager.getTemperature();
break;
case AgentConstants.TEMPERATURE_CONTROL: String replyTemperature = "Current temperature was read as: '" + currentTemperature + "C'";
int currentTemperature = agentManager.getTemperature(); log.info(AgentConstants.LOG_APPENDER + replyTemperature);
String replyTemperature = "Current temperature was read as: '" + currentTemperature + "C'"; String tempPublishTopic = String.format(AgentConstants.MQTT_PUBLISH_TOPIC,
log.info(AgentConstants.LOG_APPENDER + replyTemperature); serverName, deviceOwner, deviceID);
String tempPublishTopic = String.format(AgentConstants.MQTT_PUBLISH_TOPIC, replyMessage = AgentConstants.TEMPERATURE_CONTROL + ":" + currentTemperature;
serverName, deviceOwner, deviceID); securePayLoad = AgentUtilOperations.prepareSecurePayLoad(replyMessage);
replyMessage = AgentConstants.TEMPERATURE_CONTROL + ":" + currentTemperature; publishToQueue(tempPublishTopic, securePayLoad);
break;
try { case AgentConstants.HUMIDITY_CONTROL:
publishToQueue(tempPublishTopic, replyMessage); int currentHumidity = agentManager.getHumidity();
} catch (TransportHandlerException e) {
log.error(AgentConstants.LOG_APPENDER +
"MQTT - Publishing, reply message to the MQTT Queue at: " +
agentManager.getAgentConfigs().getMqttBrokerEndpoint() + " failed");
}
break;
case AgentConstants.HUMIDITY_CONTROL: String replyHumidity = "Current humidity was read as: '" + currentHumidity + "%'";
int currentHumidity = agentManager.getHumidity(); log.info(AgentConstants.LOG_APPENDER + replyHumidity);
String replyHumidity = "Current humidity was read as: '" + currentHumidity + "%'"; String humidPublishTopic = String.format(
log.info(AgentConstants.LOG_APPENDER + replyHumidity); AgentConstants.MQTT_PUBLISH_TOPIC, serverName, deviceOwner, deviceID);
String humidPublishTopic = String.format( replyMessage = AgentConstants.HUMIDITY_CONTROL + ":" + currentHumidity;
AgentConstants.MQTT_PUBLISH_TOPIC, serverName, deviceOwner, deviceID); securePayLoad = AgentUtilOperations.prepareSecurePayLoad(replyMessage);
replyMessage = AgentConstants.HUMIDITY_CONTROL + ":" + currentHumidity; publishToQueue(humidPublishTopic, securePayLoad);
break;
try { default:
publishToQueue(humidPublishTopic, replyMessage); log.warn(AgentConstants.LOG_APPENDER + "'" + controlSignal[0] +
} catch (TransportHandlerException e) { "' is invalid and not-supported for this device-type");
log.error(AgentConstants.LOG_APPENDER + break;
"MQTT - Publishing, reply message to the MQTT Queue at: " + }
agentManager.getAgentConfigs().getMqttBrokerEndpoint() + " failed"); } catch (AgentCoreOperationException e) {
} log.warn(AgentConstants.LOG_APPENDER + "Preparing Secure payload failed", e);
break; } catch (TransportHandlerException e) {
log.error(AgentConstants.LOG_APPENDER +
default: "MQTT - Publishing, reply message to the MQTT Queue at: " +
log.warn(AgentConstants.LOG_APPENDER + "'" + controlSignal[0] + agentManager.getAgentConfigs().getMqttBrokerEndpoint() + " failed");
"' is invalid and not-supported for this device-type");
break;
} }
} }

@ -21,10 +21,12 @@ package org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.communication.xmpp
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.jivesoftware.smack.packet.Message; import org.jivesoftware.smack.packet.Message;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.transport.xmpp.XMPPTransportHandler;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.core.AgentConstants; import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.core.AgentConstants;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.transport.TransportHandlerException;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.core.AgentManager; import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.core.AgentManager;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.core.AgentUtilOperations;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.exception.AgentCoreOperationException;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.transport.TransportHandlerException;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.agent.transport.xmpp.XMPPTransportHandler;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
@ -94,8 +96,7 @@ public class FireAlarmXMPPCommunicator extends XMPPTransportHandler {
} }
}; };
connectorServiceHandler = service.scheduleAtFixedRate(connect, 0, timeoutInterval, connectorServiceHandler = service.scheduleAtFixedRate(connect, 0, timeoutInterval, TimeUnit.MILLISECONDS);
TimeUnit.MILLISECONDS);
} }
/** /**
@ -110,63 +111,76 @@ public class FireAlarmXMPPCommunicator extends XMPPTransportHandler {
final AgentManager agentManager = AgentManager.getInstance(); final AgentManager agentManager = AgentManager.getInstance();
String from = xmppMessage.getFrom(); String from = xmppMessage.getFrom();
String message = xmppMessage.getBody(); String message = xmppMessage.getBody();
log.info(AgentConstants.LOG_APPENDER + "Received XMPP message [" + message + "] from " + String receivedMessage;
from);
String replyMessage; String replyMessage;
String[] controlSignal = message.split(":"); String securePayLoad;
try {
receivedMessage = AgentUtilOperations.extractMessageFromPayload(message);
log.info(AgentConstants.LOG_APPENDER + "Message [" + receivedMessage + "] was received");
} catch (AgentCoreOperationException e) {
log.warn(AgentConstants.LOG_APPENDER + "Could not extract message from payload.", e);
return;
}
String[] controlSignal = receivedMessage.split(":");
//message- "<SIGNAL_TYPE>:<SIGNAL_MODE>" format. (ex: "BULB:ON", "TEMPERATURE", "HUMIDITY") //message- "<SIGNAL_TYPE>:<SIGNAL_MODE>" format. (ex: "BULB:ON", "TEMPERATURE", "HUMIDITY")
try {
switch (controlSignal[0].toUpperCase()) {
case AgentConstants.BULB_CONTROL:
if (controlSignal.length != 2) {
replyMessage = "BULB controls need to be in the form - 'BULB:{ON|OFF}'";
log.warn(replyMessage);
securePayLoad = AgentUtilOperations.prepareSecurePayLoad(replyMessage);
sendXMPPMessage(xmppAdminJID, securePayLoad, "CONTROL-REPLY");
break;
}
switch (controlSignal[0].toUpperCase()) { agentManager.changeAlarmStatus(controlSignal[1].equals(AgentConstants.CONTROL_ON));
case AgentConstants.BULB_CONTROL: log.info(AgentConstants.LOG_APPENDER + "Bulb was switched to state: '" + controlSignal[1] + "'");
if (controlSignal.length != 2) {
replyMessage = "BULB controls need to be in the form - 'BULB:{ON|OFF}'";
log.warn(replyMessage);
sendXMPPMessage(xmppAdminJID, replyMessage, "CONTROL-REPLY");
break; break;
}
agentManager.changeAlarmStatus(controlSignal[1].equals(AgentConstants.CONTROL_ON)); case AgentConstants.TEMPERATURE_CONTROL:
log.info(AgentConstants.LOG_APPENDER + "Bulb was switched to state: '" + int currentTemperature = agentManager.getTemperature();
controlSignal[1] + "'");
break;
case AgentConstants.TEMPERATURE_CONTROL: String replyTemperature =
int currentTemperature = agentManager.getTemperature(); "The current temperature was read to be: '" + currentTemperature +
"C'";
String replyTemperature = log.info(AgentConstants.LOG_APPENDER + replyTemperature);
"The current temperature was read to be: '" + currentTemperature +
"C'";
log.info(AgentConstants.LOG_APPENDER + replyTemperature);
replyMessage = AgentConstants.TEMPERATURE_CONTROL + ":" + currentTemperature; replyMessage = AgentConstants.TEMPERATURE_CONTROL + ":" + currentTemperature;
sendXMPPMessage(xmppAdminJID, replyMessage, "CONTROL-REPLY"); securePayLoad = AgentUtilOperations.prepareSecurePayLoad(replyMessage);
break; sendXMPPMessage(xmppAdminJID, securePayLoad, "CONTROL-REPLY");
break;
case AgentConstants.HUMIDITY_CONTROL: case AgentConstants.HUMIDITY_CONTROL:
int currentHumidity = agentManager.getHumidity(); int currentHumidity = agentManager.getHumidity();
String replyHumidity = String replyHumidity = "The current humidity was read to be: '" + currentHumidity + "%'";
"The current humidity was read to be: '" + currentHumidity + "%'"; log.info(AgentConstants.LOG_APPENDER + replyHumidity);
log.info(AgentConstants.LOG_APPENDER + replyHumidity);
replyMessage = AgentConstants.HUMIDITY_CONTROL + ":" + currentHumidity; replyMessage = AgentConstants.HUMIDITY_CONTROL + ":" + currentHumidity;
sendXMPPMessage(xmppAdminJID, replyMessage, "CONTROL-REPLY"); securePayLoad = AgentUtilOperations.prepareSecurePayLoad(replyMessage);
break; sendXMPPMessage(xmppAdminJID, securePayLoad, "CONTROL-REPLY");
break;
default: default:
replyMessage = "'" + controlSignal[0] + replyMessage = "'" + controlSignal[0] + "' is invalid and not-supported for this device-type";
"' is invalid and not-supported for this device-type"; log.warn(replyMessage);
log.warn(replyMessage); securePayLoad = AgentUtilOperations.prepareSecurePayLoad(replyMessage);
sendXMPPMessage(xmppAdminJID, replyMessage, "CONTROL-ERROR"); sendXMPPMessage(xmppAdminJID, securePayLoad, "CONTROL-ERROR");
break; break;
}
} catch (AgentCoreOperationException e) {
log.warn(AgentConstants.LOG_APPENDER + "Preparing Secure payload failed", e);
} }
} }
@Override @Override
public void processIncomingMessage() { public void publishDeviceData() {
final AgentManager agentManager = AgentManager.getInstance(); final AgentManager agentManager = AgentManager.getInstance();
int publishInterval = agentManager.getPushInterval(); int publishInterval = agentManager.getPushInterval();
@ -225,13 +239,15 @@ public class FireAlarmXMPPCommunicator extends XMPPTransportHandler {
terminatorThread.start(); terminatorThread.start();
} }
@Override @Override
public void publishDeviceData(String... publishData) { public void processIncomingMessage() {
} }
@Override @Override
public void publishDeviceData() { public void publishDeviceData(String... publishData) {
} }
} }

@ -27,8 +27,8 @@ public class VirtualFireAlarmConstants {
public static final String URL_PREFIX = "http://"; public static final String URL_PREFIX = "http://";
public static final String BULB_CONTEXT = "/BULB/"; public static final String BULB_CONTEXT = "/BULB/";
public static final String SONAR_CONTEXT = "/HUMIDITY/"; public static final String HUMIDITY_CONTEXT = "/HUMIDITY/";
public static final String TEMPERATURE_CONTEXT = "/TEMPERATURE/"; public static final String TEMPERATURE_CONTEXT = "/TEMPERATURE/";
public static final String SENSOR_TEMPERATURE = "temperature"; public static final String SENSOR_TEMP = "temperature";
} }

@ -43,12 +43,13 @@ import org.wso2.carbon.device.mgt.iot.exception.AccessTokenException;
import org.wso2.carbon.device.mgt.iot.exception.DeviceControllerException; import org.wso2.carbon.device.mgt.iot.exception.DeviceControllerException;
import org.wso2.carbon.device.mgt.iot.sensormgt.SensorDataManager; import org.wso2.carbon.device.mgt.iot.sensormgt.SensorDataManager;
import org.wso2.carbon.device.mgt.iot.sensormgt.SensorRecord; import org.wso2.carbon.device.mgt.iot.sensormgt.SensorRecord;
import org.wso2.carbon.device.mgt.iot.transport.TransportHandlerException;
import org.wso2.carbon.device.mgt.iot.util.ZipArchive; import org.wso2.carbon.device.mgt.iot.util.ZipArchive;
import org.wso2.carbon.device.mgt.iot.util.ZipUtil; import org.wso2.carbon.device.mgt.iot.util.ZipUtil;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.plugin.constants.VirtualFireAlarmConstants; import org.wso2.carbon.device.mgt.iot.virtualfirealarm.plugin.constants.VirtualFireAlarmConstants;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.dto.DeviceJSON; import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.dto.DeviceJSON;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.exception.VirtualFireAlarmException; import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.exception.VirtualFireAlarmException;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.transport.VirtualFireAlarmMQTTSubscriber; import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.transport.VirtualFireAlarmMQTTConnector;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.transport.VirtualFireAlarmXMPPConnector; import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.transport.VirtualFireAlarmXMPPConnector;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.util.VerificationManager; import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.util.VerificationManager;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.util.VirtualFireAlarmServiceUtils; import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.util.VirtualFireAlarmServiceUtils;
@ -99,7 +100,7 @@ public class VirtualFireAlarmService {
public static final String MQTT_PROTOCOL = "MQTT"; public static final String MQTT_PROTOCOL = "MQTT";
private VerificationManager verificationManager; private VerificationManager verificationManager;
private VirtualFireAlarmMQTTSubscriber virtualFireAlarmMQTTSubscriber; private VirtualFireAlarmMQTTConnector virtualFireAlarmMQTTConnector;
private VirtualFireAlarmXMPPConnector virtualFireAlarmXMPPConnector; private VirtualFireAlarmXMPPConnector virtualFireAlarmXMPPConnector;
private ConcurrentHashMap<String, String> deviceToIpMap = new ConcurrentHashMap<>(); private ConcurrentHashMap<String, String> deviceToIpMap = new ConcurrentHashMap<>();
@ -126,7 +127,7 @@ public class VirtualFireAlarmService {
@Override @Override
public void run() { public void run() {
virtualFireAlarmXMPPConnector.initConnector(); virtualFireAlarmXMPPConnector.initConnector();
virtualFireAlarmXMPPConnector.connectAndLogin(); virtualFireAlarmXMPPConnector.connect();
} }
}; };
@ -137,23 +138,22 @@ public class VirtualFireAlarmService {
/** /**
* *
* @param virtualFireAlarmMQTTSubscriber * @param virtualFireAlarmMQTTConnector
*/ */
public void setVirtualFireAlarmMQTTSubscriber( public void setVirtualFireAlarmMQTTConnector(
final VirtualFireAlarmMQTTSubscriber virtualFireAlarmMQTTSubscriber) { final VirtualFireAlarmMQTTConnector virtualFireAlarmMQTTConnector) {
this.virtualFireAlarmMQTTSubscriber = virtualFireAlarmMQTTSubscriber; this.virtualFireAlarmMQTTConnector = virtualFireAlarmMQTTConnector;
Runnable xmppStarter = new Runnable() { // Runnable xmppStarter = new Runnable() {
@Override // @Override
public void run() { // public void run() {
virtualFireAlarmMQTTSubscriber.initConnector(); virtualFireAlarmMQTTConnector.connect();
virtualFireAlarmMQTTSubscriber.connect(); // }
} // };
}; //
// Thread xmppStarterThread = new Thread(xmppStarter);
Thread xmppStarterThread = new Thread(xmppStarter); // xmppStarterThread.setDaemon(true);
xmppStarterThread.setDaemon(true); // xmppStarterThread.start();
xmppStarterThread.start();
} }
/** /**
@ -176,8 +176,8 @@ public class VirtualFireAlarmService {
* *
* @return * @return
*/ */
public VirtualFireAlarmMQTTSubscriber getVirtualFireAlarmMQTTSubscriber() { public VirtualFireAlarmMQTTConnector getVirtualFireAlarmMQTTConnector() {
return virtualFireAlarmMQTTSubscriber; return virtualFireAlarmMQTTConnector;
} }
/* --------------------------------------------------------------------------------------- /* ---------------------------------------------------------------------------------------
@ -622,20 +622,22 @@ public class VirtualFireAlarmService {
VirtualFireAlarmServiceUtils.sendCommandViaHTTP(deviceHTTPEndpoint, callUrlPattern, true); VirtualFireAlarmServiceUtils.sendCommandViaHTTP(deviceHTTPEndpoint, callUrlPattern, true);
break; break;
case MQTT_PROTOCOL: case MQTT_PROTOCOL:
String resource = VirtualFireAlarmConstants.BULB_CONTEXT.replace("/", ""); String mqttResource = VirtualFireAlarmConstants.BULB_CONTEXT.replace("/", "");
virtualFireAlarmMQTTSubscriber.publishDeviceData(owner, deviceId, resource, switchToState); virtualFireAlarmMQTTConnector.publishDeviceData(owner, deviceId, mqttResource, switchToState);
break; break;
case XMPP_PROTOCOL: case XMPP_PROTOCOL:
VirtualFireAlarmServiceUtils.sendCommandViaXMPP(owner, deviceId, String xmppResource = VirtualFireAlarmConstants.BULB_CONTEXT.replace("/", "");
VirtualFireAlarmConstants.BULB_CONTEXT, virtualFireAlarmXMPPConnector.publishDeviceData(owner, deviceId, xmppResource, switchToState);
switchToState, virtualFireAlarmXMPPConnector);
break; break;
default: default:
response.setStatus(Response.Status.NOT_ACCEPTABLE.getStatusCode()); response.setStatus(Response.Status.NOT_ACCEPTABLE.getStatusCode());
return; return;
} }
} catch (DeviceManagementException e) { } catch (DeviceManagementException | TransportHandlerException e) {
log.error("Failed to send switch-bulb request to device [" + deviceId + "] via " + protocolString); log.error("Failed to send switch-bulb request to device [" + deviceId + "] via " + protocolString);
response.setStatus(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()); response.setStatus(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
return; return;
@ -653,23 +655,22 @@ public class VirtualFireAlarmService {
* @param response * @param response
* @return * @return
*/ */
@Path("controller/readsonar") @Path("controller/readhumidity")
@GET @GET
@Feature( code="VIRTUALFIREALARM_READSONAR", name="Read Sonar", @Feature( code="VIRTUALFIREALARM_READHUMIDITY", name="Read Humidity",
description="Read Sonar Readings from Virtual Fire Alarm") description="Read Humidity Readings from Virtual Fire Alarm")
public String requestSonarReading(@HeaderParam("owner") String owner, public String requestHumidity(@HeaderParam("owner") String owner,
@HeaderParam("deviceId") String deviceId, @HeaderParam("deviceId") String deviceId,
@HeaderParam("protocol") String protocol, @HeaderParam("protocol") String protocol,
@Context HttpServletResponse response) { @Context HttpServletResponse response) {
String replyMsg = ""; String replyMsg = "";
DeviceValidator deviceValidator = new DeviceValidator(); DeviceValidator deviceValidator = new DeviceValidator();
try { try {
if (!deviceValidator.isExist(owner, SUPER_TENANT, new DeviceIdentifier(deviceId, if (!deviceValidator.isExist(owner, SUPER_TENANT, new DeviceIdentifier(
VirtualFireAlarmConstants deviceId, VirtualFireAlarmConstants.DEVICE_TYPE))) {
.DEVICE_TYPE))) {
response.setStatus(Response.Status.UNAUTHORIZED.getStatusCode()); response.setStatus(Response.Status.UNAUTHORIZED.getStatusCode());
return "Unauthorized Access"; return "Unauthorized Access Attempt";
} }
} catch (DeviceManagementException e) { } catch (DeviceManagementException e) {
replyMsg = e.getErrorMessage(); replyMsg = e.getErrorMessage();
@ -680,8 +681,7 @@ public class VirtualFireAlarmService {
String protocolString = protocol.toUpperCase(); String protocolString = protocol.toUpperCase();
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Sending request to read sonar value of device [" + deviceId + "] via " + log.debug("Sending request to read humidity value of device [" + deviceId + "] via " + protocolString);
protocolString);
} }
try { try {
@ -689,26 +689,24 @@ public class VirtualFireAlarmService {
case HTTP_PROTOCOL: case HTTP_PROTOCOL:
String deviceHTTPEndpoint = deviceToIpMap.get(deviceId); String deviceHTTPEndpoint = deviceToIpMap.get(deviceId);
if (deviceHTTPEndpoint == null) { if (deviceHTTPEndpoint == null) {
replyMsg = replyMsg = "IP not registered for device: " + deviceId + " of owner: " + owner;
"IP not registered for device: " + deviceId + " of owner: " + owner;
response.setStatus(Response.Status.PRECONDITION_FAILED.getStatusCode()); response.setStatus(Response.Status.PRECONDITION_FAILED.getStatusCode());
return replyMsg; return replyMsg;
} }
replyMsg = VirtualFireAlarmServiceUtils.sendCommandViaHTTP(deviceHTTPEndpoint, replyMsg = VirtualFireAlarmServiceUtils.sendCommandViaHTTP(deviceHTTPEndpoint,
VirtualFireAlarmConstants.SONAR_CONTEXT, VirtualFireAlarmConstants.HUMIDITY_CONTEXT,
false); false);
break; break;
case MQTT_PROTOCOL: case MQTT_PROTOCOL:
String resource = VirtualFireAlarmConstants.SONAR_CONTEXT.replace("/", ""); String mqttResource = VirtualFireAlarmConstants.HUMIDITY_CONTEXT.replace("/", "");
virtualFireAlarmMQTTSubscriber.publishDeviceData(owner, deviceId, resource, ""); virtualFireAlarmMQTTConnector.publishDeviceData(owner, deviceId, mqttResource, "");
break; break;
case XMPP_PROTOCOL: case XMPP_PROTOCOL:
VirtualFireAlarmServiceUtils.sendCommandViaXMPP(owner, deviceId, String xmppResource = VirtualFireAlarmConstants.HUMIDITY_CONTEXT.replace("/", "");
VirtualFireAlarmConstants.SONAR_CONTEXT, "", virtualFireAlarmXMPPConnector.publishDeviceData(owner, deviceId, xmppResource, "");
virtualFireAlarmXMPPConnector);
break; break;
default: default:
@ -716,14 +714,14 @@ public class VirtualFireAlarmService {
response.setStatus(Response.Status.NOT_ACCEPTABLE.getStatusCode()); response.setStatus(Response.Status.NOT_ACCEPTABLE.getStatusCode());
return replyMsg; return replyMsg;
} }
} catch (DeviceManagementException e) { } catch (DeviceManagementException | TransportHandlerException e) {
replyMsg = e.getErrorMessage(); replyMsg = e.getMessage();
response.setStatus(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()); response.setStatus(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
return replyMsg; return replyMsg;
} }
response.setStatus(Response.Status.OK.getStatusCode()); response.setStatus(Response.Status.OK.getStatusCode());
replyMsg = "The current sonar reading of the device is " + replyMsg; replyMsg = "The current humidity reading of the device is " + replyMsg;
return replyMsg; return replyMsg;
} }
@ -750,9 +748,8 @@ public class VirtualFireAlarmService {
DeviceValidator deviceValidator = new DeviceValidator(); DeviceValidator deviceValidator = new DeviceValidator();
try { try {
if (!deviceValidator.isExist(owner, SUPER_TENANT, new DeviceIdentifier(deviceId, if (!deviceValidator.isExist(owner, SUPER_TENANT,
VirtualFireAlarmConstants new DeviceIdentifier(deviceId, VirtualFireAlarmConstants.DEVICE_TYPE))) {
.DEVICE_TYPE))) {
response.setStatus(Response.Status.UNAUTHORIZED.getStatusCode()); response.setStatus(Response.Status.UNAUTHORIZED.getStatusCode());
} }
} catch (DeviceManagementException e) { } catch (DeviceManagementException e) {
@ -762,9 +759,8 @@ public class VirtualFireAlarmService {
String protocolString = protocol.toUpperCase(); String protocolString = protocol.toUpperCase();
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug( log.debug("Sending request to read virtual-firealarm-temperature of device " +
"Sending request to read virtual-firealarm-temperature of device [" + deviceId + "[" + deviceId + "] via " + protocolString);
"] via " + protocolString);
} }
try { try {
@ -775,35 +771,33 @@ public class VirtualFireAlarmService {
response.setStatus(Response.Status.PRECONDITION_FAILED.getStatusCode()); response.setStatus(Response.Status.PRECONDITION_FAILED.getStatusCode());
} }
String temperatureValue = VirtualFireAlarmServiceUtils. String temperatureValue = VirtualFireAlarmServiceUtils.sendCommandViaHTTP(
sendCommandViaHTTP(deviceHTTPEndpoint, deviceHTTPEndpoint,
VirtualFireAlarmConstants.TEMPERATURE_CONTEXT, VirtualFireAlarmConstants.TEMPERATURE_CONTEXT,
false); false);
SensorDataManager.getInstance().setSensorRecord(deviceId, SensorDataManager.getInstance().setSensorRecord(deviceId,
VirtualFireAlarmConstants.SENSOR_TEMPERATURE, VirtualFireAlarmConstants.SENSOR_TEMP,
temperatureValue, temperatureValue,
Calendar.getInstance().getTimeInMillis()); Calendar.getInstance().getTimeInMillis());
break; break;
case MQTT_PROTOCOL: case MQTT_PROTOCOL:
String resource = VirtualFireAlarmConstants.TEMPERATURE_CONTEXT.replace("/", ""); String mqttResource = VirtualFireAlarmConstants.TEMPERATURE_CONTEXT.replace("/", "");
virtualFireAlarmMQTTSubscriber.publishDeviceData(owner, deviceId, resource, ""); virtualFireAlarmMQTTConnector.publishDeviceData(owner, deviceId, mqttResource, "");
break; break;
case XMPP_PROTOCOL: case XMPP_PROTOCOL:
VirtualFireAlarmServiceUtils.sendCommandViaXMPP(owner, deviceId, String xmppResource = VirtualFireAlarmConstants.TEMPERATURE_CONTEXT.replace("/", "");
VirtualFireAlarmConstants.TEMPERATURE_CONTEXT, "", virtualFireAlarmMQTTConnector.publishDeviceData(owner, deviceId, xmppResource, "");
virtualFireAlarmXMPPConnector);
break; break;
default: default:
response.setStatus(Response.Status.NOT_ACCEPTABLE.getStatusCode()); response.setStatus(Response.Status.NOT_ACCEPTABLE.getStatusCode());
} }
sensorRecord = SensorDataManager.getInstance().getSensorRecord(deviceId, sensorRecord = SensorDataManager.getInstance().getSensorRecord(deviceId,
VirtualFireAlarmConstants VirtualFireAlarmConstants.SENSOR_TEMP);
.SENSOR_TEMPERATURE); } catch (DeviceManagementException | DeviceControllerException | TransportHandlerException e) {
} catch (DeviceManagementException | DeviceControllerException e) {
response.setStatus(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()); response.setStatus(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
} }
@ -829,18 +823,16 @@ public class VirtualFireAlarmService {
if (registeredIp == null) { if (registeredIp == null) {
log.warn("Unregistered IP: Temperature Data Received from an un-registered IP " + log.warn("Unregistered IP: Temperature Data Received from an un-registered IP " +
deviceIp + " for device ID - " + deviceId); deviceIp + " for device ID - " + deviceId);
response.setStatus(Response.Status.PRECONDITION_FAILED.getStatusCode()); response.setStatus(Response.Status.PRECONDITION_FAILED.getStatusCode());
return; return;
} else if (!registeredIp.equals(deviceIp)) { } else if (!registeredIp.equals(deviceIp)) {
log.warn("Conflicting IP: Received IP is " + deviceIp + ". Device with ID " + log.warn("Conflicting IP: Received IP is " + deviceIp + ". Device with ID " + deviceId +
deviceId + " is already registered under some other IP. Re-registration required");
" is already registered under some other IP. Re-registration " +
"required");
response.setStatus(Response.Status.CONFLICT.getStatusCode()); response.setStatus(Response.Status.CONFLICT.getStatusCode());
return; return;
} }
SensorDataManager.getInstance().setSensorRecord(deviceId, VirtualFireAlarmConstants.SENSOR_TEMPERATURE, SensorDataManager.getInstance().setSensorRecord(deviceId, VirtualFireAlarmConstants.SENSOR_TEMP,
String.valueOf(temperature), String.valueOf(temperature),
Calendar.getInstance().getTimeInMillis()); Calendar.getInstance().getTimeInMillis());

@ -22,10 +22,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.wso2.carbon.device.mgt.common.DeviceManagementException;
import org.wso2.carbon.device.mgt.iot.config.server.DeviceManagementConfigurationManager; import org.wso2.carbon.device.mgt.iot.config.server.DeviceManagementConfigurationManager;
import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttConfig; import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttConfig;
import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttSubscriber;
import org.wso2.carbon.device.mgt.iot.sensormgt.SensorDataManager; import org.wso2.carbon.device.mgt.iot.sensormgt.SensorDataManager;
import org.wso2.carbon.device.mgt.iot.transport.TransportHandlerException; import org.wso2.carbon.device.mgt.iot.transport.TransportHandlerException;
import org.wso2.carbon.device.mgt.iot.transport.mqtt.MQTTTransportHandler; import org.wso2.carbon.device.mgt.iot.transport.mqtt.MQTTTransportHandler;
@ -41,27 +39,23 @@ import java.security.PublicKey;
import java.util.Calendar; import java.util.Calendar;
import java.util.UUID; import java.util.UUID;
public class VirtualFireAlarmMQTTSubscriber extends MQTTTransportHandler { @SuppressWarnings("no JAX-WS annotation")
private static Log log = LogFactory.getLog(VirtualFireAlarmMQTTSubscriber.class); public class VirtualFireAlarmMQTTConnector extends MQTTTransportHandler {
private static Log log = LogFactory.getLog(VirtualFireAlarmMQTTConnector.class);
private static final String serverName = private static String serverName = DeviceManagementConfigurationManager.getInstance().
DeviceManagementConfigurationManager.getInstance().getDeviceManagementServerInfo().getName(); getDeviceManagementServerInfo().getName();
private static final String subscribeTopic =
serverName + File.separator + "+" + File.separator + VirtualFireAlarmConstants.DEVICE_TYPE +
File.separator + "+" + File.separator + "publisher";
private static final String iotServerSubscriber = UUID.randomUUID().toString().substring(0, 5); private static String subscribeTopic = serverName + File.separator + "+" + File.separator +
private String mqttEndpoint; VirtualFireAlarmConstants.DEVICE_TYPE + File.separator + "+" + File.separator + "publisher";
private VirtualFireAlarmMQTTSubscriber() { private static String iotServerSubscriber = UUID.randomUUID().toString().substring(0, 5);
private VirtualFireAlarmMQTTConnector() {
super(iotServerSubscriber, VirtualFireAlarmConstants.DEVICE_TYPE, super(iotServerSubscriber, VirtualFireAlarmConstants.DEVICE_TYPE,
MqttConfig.getInstance().getMqttQueueEndpoint(), subscribeTopic); MqttConfig.getInstance().getMqttQueueEndpoint(), subscribeTopic);
} }
public void initConnector() {
mqttEndpoint = MqttConfig.getInstance().getMqttQueueEndpoint();
}
@Override @Override
public void connect() { public void connect() {
Runnable connector = new Runnable() { Runnable connector = new Runnable() {
@ -75,8 +69,7 @@ public class VirtualFireAlarmMQTTSubscriber extends MQTTTransportHandler {
try { try {
Thread.sleep(timeoutInterval); Thread.sleep(timeoutInterval);
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
//TODO: Need to print exception log.error("MQTT-Subscriber: Thread Sleep Interrupt Exception.", ex);
log.error("MQTT-Subscriber: Thread Sleep Interrupt Exception");
} }
} }
} }
@ -126,7 +119,7 @@ public class VirtualFireAlarmMQTTSubscriber extends MQTTTransportHandler {
} else if (actualMessage.contains("TEMPERATURE")) { } else if (actualMessage.contains("TEMPERATURE")) {
String temperatureValue = actualMessage.split(":")[1]; String temperatureValue = actualMessage.split(":")[1];
SensorDataManager.getInstance().setSensorRecord(deviceId, VirtualFireAlarmConstants.SENSOR_TEMPERATURE, SensorDataManager.getInstance().setSensorRecord(deviceId, VirtualFireAlarmConstants.SENSOR_TEMP,
temperatureValue, temperatureValue,
Calendar.getInstance().getTimeInMillis()); Calendar.getInstance().getTimeInMillis());
} }
@ -137,22 +130,13 @@ public class VirtualFireAlarmMQTTSubscriber extends MQTTTransportHandler {
} }
} }
@Override
public void publishDeviceData() {
}
@Override
public void processIncomingMessage() {
}
@Override @Override
public void publishDeviceData(String... publishData) { public void publishDeviceData(String... publishData) throws TransportHandlerException {
if (publishData.length != 4) { if (publishData.length != 4) {
String errorMsg = "Incorrect number of arguments received to SEND-MQTT Message. " +
"Need to be [owner, deviceId, resource{BULB/TEMP}, state{ON/OFF or null}]";
log.error(errorMsg);
throw new TransportHandlerException(errorMsg);
} }
String deviceOwner = publishData[0]; String deviceOwner = publishData[0];
@ -181,9 +165,10 @@ public class VirtualFireAlarmMQTTSubscriber extends MQTTTransportHandler {
publishToQueue(publishTopic, pushMessage); publishToQueue(publishTopic, pushMessage);
} catch (VirtualFireAlarmException e) { } catch (VirtualFireAlarmException e) {
log.error("Preparing Secure payload failed", e); String errorMsg = "Preparing Secure payload failed for device - [" + deviceId + "] of owner - " +
} catch (TransportHandlerException e) { "[" + deviceOwner + "].";
log.warn("Data Publish attempt to topic - [" + publishTopic + "] failed for payload [" + pushMessage + "]"); log.error(errorMsg);
throw new TransportHandlerException(errorMsg, e);
} }
} }
@ -197,13 +182,15 @@ public class VirtualFireAlarmMQTTSubscriber extends MQTTTransportHandler {
closeConnection(); closeConnection();
} catch (MqttException e) { } catch (MqttException e) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.warn("Unable to 'STOP' MQTT connection at broker at: " + mqttBrokerEndPoint); log.warn("Unable to 'STOP' MQTT connection at broker at: " + mqttBrokerEndPoint
+ " for device-type - " + VirtualFireAlarmConstants.DEVICE_TYPE, e);
} }
try { try {
Thread.sleep(timeoutInterval); Thread.sleep(timeoutInterval);
} catch (InterruptedException e1) { } catch (InterruptedException e1) {
log.error("MQTT-Terminator: Thread Sleep Interrupt Exception"); log.error("MQTT-Terminator: Thread Sleep Interrupt Exception at device-type - " +
VirtualFireAlarmConstants.DEVICE_TYPE, e1);
} }
} }
} }
@ -214,4 +201,26 @@ public class VirtualFireAlarmMQTTSubscriber extends MQTTTransportHandler {
terminatorThread.setDaemon(true); terminatorThread.setDaemon(true);
terminatorThread.start(); terminatorThread.start();
} }
@Override
public void publishDeviceData() {
// nothing to do
}
@Override
public void publishDeviceData(MqttMessage publishData) throws TransportHandlerException {
// nothing to do
}
@Override
public void processIncomingMessage() {
// nothing to do
}
@Override
public void processIncomingMessage(MqttMessage message) throws TransportHandlerException {
// nothing to do
}
} }

@ -20,49 +20,122 @@ package org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.transport;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.jivesoftware.smack.packet.Message; import org.jivesoftware.smack.packet.Message;
import org.wso2.carbon.device.mgt.common.DeviceManagementException; import org.wso2.carbon.device.mgt.iot.config.server.DeviceManagementConfigurationManager;
import org.wso2.carbon.device.mgt.iot.controlqueue.xmpp.XmppAccount;
import org.wso2.carbon.device.mgt.iot.controlqueue.xmpp.XmppConfig; import org.wso2.carbon.device.mgt.iot.controlqueue.xmpp.XmppConfig;
import org.wso2.carbon.device.mgt.iot.controlqueue.xmpp.XmppConnector; import org.wso2.carbon.device.mgt.iot.controlqueue.xmpp.XmppServerClient;
import org.wso2.carbon.device.mgt.iot.exception.DeviceControllerException;
import org.wso2.carbon.device.mgt.iot.sensormgt.SensorDataManager; import org.wso2.carbon.device.mgt.iot.sensormgt.SensorDataManager;
import org.wso2.carbon.device.mgt.iot.transport.TransportHandlerException;
import org.wso2.carbon.device.mgt.iot.transport.xmpp.XMPPTransportHandler;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.plugin.constants.VirtualFireAlarmConstants; import org.wso2.carbon.device.mgt.iot.virtualfirealarm.plugin.constants.VirtualFireAlarmConstants;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.exception.VirtualFireAlarmException;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.util.VerificationManager;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.util.VirtualFireAlarmServiceUtils; import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.util.VirtualFireAlarmServiceUtils;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.security.PrivateKey;
import java.security.PublicKey;
import java.util.Calendar; import java.util.Calendar;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public class VirtualFireAlarmXMPPConnector extends XmppConnector { @SuppressWarnings("no JAX-WS annotation")
public class VirtualFireAlarmXMPPConnector extends XMPPTransportHandler {
private static Log log = LogFactory.getLog(VirtualFireAlarmXMPPConnector.class); private static Log log = LogFactory.getLog(VirtualFireAlarmXMPPConnector.class);
private static String xmppServerIP; private static String xmppServerIP;
// private static int xmppServerPort; private static String xmppVFireAlarmAdminUsername;
private static String xmppAdminUsername; private static String xmppVFireAlarmAdminAccountJID;
private static String xmppAdminPassword; private static final String V_FIREALARM_XMPP_PASSWORD = "vfirealarm@123";
private static String xmppAdminAccountJID; private ScheduledFuture<?> connectorServiceHandler;
private ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
private VirtualFireAlarmXMPPConnector() { private VirtualFireAlarmXMPPConnector() {
super(XmppConfig.getInstance().getXmppServerIP(), super(XmppConfig.getInstance().getXmppServerIP(), XmppConfig.getInstance().getSERVER_CONNECTION_PORT());
XmppConfig.getInstance().getSERVER_CONNECTION_PORT());
} }
public void initConnector() { public void initConnector() {
String serverName =
DeviceManagementConfigurationManager.getInstance().getDeviceManagementServerInfo().getName();
xmppVFireAlarmAdminUsername = serverName + "_" + VirtualFireAlarmConstants.DEVICE_TYPE;
xmppServerIP = XmppConfig.getInstance().getXmppServerIP(); xmppServerIP = XmppConfig.getInstance().getXmppServerIP();
xmppAdminUsername = XmppConfig.getInstance().getXmppUsername(); xmppVFireAlarmAdminAccountJID = xmppVFireAlarmAdminUsername + "@" + xmppServerIP;
xmppAdminPassword = XmppConfig.getInstance().getXmppPassword(); createXMPPAccountForDeviceType();
xmppAdminAccountJID = xmppAdminUsername + "@" + xmppServerIP;
} }
public void connectAndLogin() { public void createXMPPAccountForDeviceType() {
boolean accountExists = false;
XmppServerClient xmppServerClient = new XmppServerClient();
try { try {
super.connectAndLogin(xmppAdminUsername, xmppAdminPassword, null); accountExists = xmppServerClient.doesXMPPUserAccountExist(xmppVFireAlarmAdminUsername);
super.setMessageFilterOnReceiver(xmppAdminAccountJID); } catch (DeviceControllerException e) {
} catch (DeviceManagementException e) { String errorMsg = "An error was encountered whilst trying to check whether Server XMPP account exists " +
log.error("Connect/Login attempt to XMPP Server at: " + xmppServerIP + " failed"); "for device-type - " + VirtualFireAlarmConstants.DEVICE_TYPE;
retryXMPPConnection(); log.error(errorMsg, e);
} }
if (!accountExists) {
XmppAccount xmppAccount = new XmppAccount();
xmppAccount.setAccountName(xmppVFireAlarmAdminUsername);
xmppAccount.setUsername(xmppVFireAlarmAdminUsername);
xmppAccount.setPassword(V_FIREALARM_XMPP_PASSWORD);
xmppAccount.setEmail("");
try {
boolean xmppCreated = xmppServerClient.createXMPPAccount(xmppAccount);
if (!xmppCreated) {
log.warn("Server XMPP Account was not created for device-type - " +
VirtualFireAlarmConstants.DEVICE_TYPE +
". Check whether XMPP is enabled in \"devicemgt-config.xml\" & restart.");
} else {
log.info("Server XMPP Account [" + xmppVFireAlarmAdminUsername +
"] was not created for device-type - " + VirtualFireAlarmConstants.DEVICE_TYPE);
}
} catch (DeviceControllerException e) {
String errorMsg =
"An error was encountered whilst trying to create Server XMPP account for device-type - "
+ VirtualFireAlarmConstants.DEVICE_TYPE;
log.error(errorMsg, e);
}
}
}
@Override
public void connect() {
Runnable connector = new Runnable() {
public void run() {
if (!isConnected()) {
try {
connectToServer();
loginToServer(xmppVFireAlarmAdminUsername, V_FIREALARM_XMPP_PASSWORD, null);
setFilterOnReceiver(xmppVFireAlarmAdminAccountJID);
} catch (TransportHandlerException e) {
if (log.isDebugEnabled()) {
log.warn("Connection/Login to XMPP server at: " + server + " as " +
xmppVFireAlarmAdminUsername + " failed for device-type [" +
VirtualFireAlarmConstants.DEVICE_TYPE + "].", e);
}
}
}
}
};
connectorServiceHandler = service.scheduleAtFixedRate(connector, 0, timeoutInterval, TimeUnit.MILLISECONDS);
} }
@Override @Override
protected void processXMPPMessage(Message xmppMessage) { public void processIncomingMessage(Message xmppMessage) throws TransportHandlerException {
String from = xmppMessage.getFrom(); String from = xmppMessage.getFrom();
String subject = xmppMessage.getSubject(); String subject = xmppMessage.getSubject();
String message = xmppMessage.getBody(); String message = xmppMessage.getBody();
@ -75,80 +148,139 @@ public class VirtualFireAlarmXMPPConnector extends XmppConnector {
String owner = from.substring(indexOfSlash + 1, from.length()); String owner = from.substring(indexOfSlash + 1, from.length());
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Received XMPP message for: {OWNER-" + owner + "} & {DEVICE.ID-" + deviceId + "}"); log.debug("Received XMPP message for: [OWNER-" + owner + "] & [DEVICE.ID-" + deviceId + "]");
} }
if (subject != null) { try {
switch (subject) { PublicKey clientPublicKey = VirtualFireAlarmServiceUtils.getDevicePublicKey(deviceId);
case "PUBLISHER": PrivateKey serverPrivateKey = VerificationManager.getServerPrivateKey();
float temperature = Float.parseFloat(message.split(":")[1]); String actualMessage = VirtualFireAlarmServiceUtils.extractMessageFromPayload(message, serverPrivateKey,
if (!VirtualFireAlarmServiceUtils.publishToDAS(owner, deviceId, temperature)) { clientPublicKey);
log.error("XMPP Connector: Publishing data to DAS failed."); if (log.isDebugEnabled()) {
} log.debug("XMPP: Received Message [" + actualMessage + "] from: [" + from + "]");
}
if (log.isDebugEnabled()) { if (subject != null) {
log.debug("XMPP: Publisher Message [" + message + "] from [" + from + "]"); switch (subject) {
log.debug("XMPP Connector: Published data to DAS successfully."); case "PUBLISHER":
} float temperature = Float.parseFloat(actualMessage.split(":")[1]);
break; if (!VirtualFireAlarmServiceUtils.publishToDAS(owner, deviceId, temperature)) {
case "CONTROL-REPLY": log.error("XMPP Connector: Publishing VirtualFirealarm data to DAS failed.");
if (log.isDebugEnabled()) { }
log.debug("XMPP: Reply Message [" + message + "] from [" + from + "]");
} if (log.isDebugEnabled()) {
String tempVal = message.split(":")[1]; log.debug("XMPP: Publisher Message [" + actualMessage + "] from [" + from + "] " +
SensorDataManager.getInstance().setSensorRecord(deviceId, "was successfully published to DAS");
VirtualFireAlarmConstants.SENSOR_TEMPERATURE, }
tempVal, break;
Calendar.getInstance().getTimeInMillis());
break; case "CONTROL-REPLY":
default: String tempVal = actualMessage.split(":")[1];
if (log.isDebugEnabled()) { SensorDataManager.getInstance().setSensorRecord(deviceId,
log.warn("Unknown XMPP Message [" + message + "] from [" + from + "] received"); VirtualFireAlarmConstants.SENSOR_TEMP,
} tempVal,
break; Calendar.getInstance().getTimeInMillis());
break;
default:
if (log.isDebugEnabled()) {
log.warn("Unknown XMPP Message [" + actualMessage + "] from [" + from + "] received");
}
break;
}
} }
} catch (VirtualFireAlarmException e) {
String errorMsg =
"CertificateManagementService failure oo Signature-Verification/Decryption was unsuccessful.";
log.error(errorMsg, e);
} }
} else { } else {
log.warn("Received XMPP message from client with unexpected JID [" + from + "]."); log.warn("Received XMPP message from client with unexpected JID [" + from + "].");
} }
} }
private void retryXMPPConnection() { @Override
Thread retryToConnect = new Thread() { public void publishDeviceData(String... publishData) throws TransportHandlerException {
@Override if (publishData.length != 4) {
public void run() { String errorMsg = "Incorrect number of arguments received to SEND-MQTT Message. " +
"Need to be [owner, deviceId, resource{BULB/TEMP}, state{ON/OFF or null}]";
log.error(errorMsg);
throw new TransportHandlerException(errorMsg);
}
while (true) { String deviceOwner = publishData[0];
if (!isConnected()) { String deviceId = publishData[1];
if (log.isDebugEnabled()) { String resource = publishData[2];
log.debug("Re-trying to reach XMPP Server...."); String state = publishData[3];
}
try { try {
VirtualFireAlarmXMPPConnector.super.connectAndLogin(xmppAdminUsername, PublicKey devicePublicKey = VirtualFireAlarmServiceUtils.getDevicePublicKey(deviceId);
xmppAdminPassword, PrivateKey serverPrivateKey = VerificationManager.getServerPrivateKey();
null);
VirtualFireAlarmXMPPConnector.super.setMessageFilterOnReceiver( String actualMessage = resource + ":" + state;
xmppAdminAccountJID); String encryptedMsg = VirtualFireAlarmServiceUtils.prepareSecurePayLoad(actualMessage,
} catch (DeviceManagementException e1) { devicePublicKey,
if (log.isDebugEnabled()) { serverPrivateKey);
log.debug("Attempt to re-connect to XMPP-Server failed");
} String clientToConnect = deviceId + "@" + xmppServerIP + File.separator + deviceOwner;
} sendXMPPMessage(clientToConnect, encryptedMsg, "CONTROL-REQUEST");
} else {
break; } catch (VirtualFireAlarmException e) {
String errorMsg = "Preparing Secure payload failed for device - [" + deviceId + "] of owner - " +
"[" + deviceOwner + "].";
log.error(errorMsg);
throw new TransportHandlerException(errorMsg, e);
}
}
@Override
public void disconnect() {
Runnable stopConnection = new Runnable() {
public void run() {
while (isConnected()) {
connectorServiceHandler.cancel(true);
closeConnection();
if (log.isDebugEnabled()) {
log.warn("Unable to 'STOP' connection to XMPP server at: " + server +
" for user - " + xmppVFireAlarmAdminUsername);
} }
try { try {
Thread.sleep(5000); Thread.sleep(timeoutInterval);
} catch (InterruptedException e1) { } catch (InterruptedException e1) {
log.error("XMPP: Thread Sleep Interrupt Exception"); log.error("XMPP-Terminator: Thread Sleep Interrupt Exception for "
+ VirtualFireAlarmConstants.DEVICE_TYPE + " type.", e1);
} }
} }
} }
}; };
retryToConnect.setDaemon(true); Thread terminatorThread = new Thread(stopConnection);
retryToConnect.start(); terminatorThread.setDaemon(true);
terminatorThread.start();
}
@Override
public void processIncomingMessage(Message message, String... messageParams) throws TransportHandlerException {
// nothing to do
}
@Override
public void processIncomingMessage() throws TransportHandlerException {
// nothing to do
}
@Override
public void publishDeviceData() throws TransportHandlerException {
// nothing to do
}
@Override
public void publishDeviceData(Message publishData) throws TransportHandlerException {
// nothing to do
} }
} }

@ -32,18 +32,11 @@ import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.device.mgt.analytics.exception.DataPublisherConfigurationException; import org.wso2.carbon.device.mgt.analytics.exception.DataPublisherConfigurationException;
import org.wso2.carbon.device.mgt.analytics.service.DeviceAnalyticsService; import org.wso2.carbon.device.mgt.analytics.service.DeviceAnalyticsService;
import org.wso2.carbon.device.mgt.common.DeviceManagementException; import org.wso2.carbon.device.mgt.common.DeviceManagementException;
import org.wso2.carbon.device.mgt.iot.DeviceController;
import org.wso2.carbon.device.mgt.iot.controlqueue.xmpp.XmppConfig;
import org.wso2.carbon.device.mgt.iot.exception.DeviceControllerException;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.plugin.constants.VirtualFireAlarmConstants; import org.wso2.carbon.device.mgt.iot.virtualfirealarm.plugin.constants.VirtualFireAlarmConstants;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.VirtualFireAlarmService;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.exception.VirtualFireAlarmException; import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.exception.VirtualFireAlarmException;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.transport.VirtualFireAlarmMQTTSubscriber;
import org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.transport.VirtualFireAlarmXMPPConnector;
import javax.ws.rs.HttpMethod; import javax.ws.rs.HttpMethod;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
@ -157,27 +150,6 @@ public class VirtualFireAlarmServiceUtils {
return responseMsg; return responseMsg;
} }
public static void sendCommandViaXMPP(String deviceOwner, String deviceId, String resource,
String state, VirtualFireAlarmXMPPConnector virtualFireAlarmXMPPConnector)
throws DeviceManagementException {
String xmppServerDomain = XmppConfig.getInstance().getXmppEndpoint();
int indexOfChar = xmppServerDomain.lastIndexOf(File.separator);
if (indexOfChar != -1) {
xmppServerDomain = xmppServerDomain.substring((indexOfChar + 1), xmppServerDomain.length());
}
indexOfChar = xmppServerDomain.indexOf(":");
if (indexOfChar != -1) {
xmppServerDomain = xmppServerDomain.substring(0, indexOfChar);
}
String clientToConnect = deviceId + "@" + xmppServerDomain + File.separator + deviceOwner;
String message = resource.replace("/", "") + ":" + state;
virtualFireAlarmXMPPConnector.sendXMPPMessage(clientToConnect, message, "CONTROL-REQUEST");
}
/* --------------------------------------------------------------------------------------- /* ---------------------------------------------------------------------------------------
Utility methods relevant to creating and sending http requests Utility methods relevant to creating and sending http requests
--------------------------------------------------------------------------------------- */ --------------------------------------------------------------------------------------- */

@ -87,8 +87,8 @@
<Resource> <Resource>
<AuthType>Any</AuthType> <AuthType>Any</AuthType>
<HttpVerb>GET</HttpVerb> <HttpVerb>GET</HttpVerb>
<Uri>http://localhost:9763/virtual_firealarm/controller/controller/readsonar</Uri> <Uri>http://localhost:9763/virtual_firealarm/controller/controller/readhumidity</Uri>
<UriTemplate>/controller/readsonar</UriTemplate> <UriTemplate>/controller/readhumidity</UriTemplate>
</Resource> </Resource>
<Resource> <Resource>
<AuthType>Any</AuthType> <AuthType>Any</AuthType>

@ -24,26 +24,12 @@
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://cxf.apache.org/jaxrs http://cxf.apache.org/schemas/jaxrs.xsd"> http://cxf.apache.org/jaxrs http://cxf.apache.org/schemas/jaxrs.xsd">
<!--<jaxrs:server id="VirtualFireAlarmController" address="/controller">-->
<!--<jaxrs:serviceBeans>-->
<!--<bean id="VirtualFireAlarmControllerService"-->
<!--class="org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.VirtualFireAlarmControllerService">-->
<!--<property name="virtualFireAlarmMQTTSubscriber" ref="mqttSubscriberBean"/>-->
<!--<property name="virtualFireAlarmXMPPConnector" ref="xmppConnectorBean"/>-->
<!--</bean>-->
<!--</jaxrs:serviceBeans>-->
<!--<jaxrs:providers>-->
<!--<bean class="org.codehaus.jackson.jaxrs.JacksonJsonProvider" />-->
<!--</jaxrs:providers>-->
<!--</jaxrs:server>-->
<jaxrs:server id="VirtualFireAlarm" address="/"> <jaxrs:server id="VirtualFireAlarm" address="/">
<jaxrs:serviceBeans> <jaxrs:serviceBeans>
<bean id="VirtualFireAlarmService" <bean id="VirtualFireAlarmService"
class="org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.VirtualFireAlarmService"> class="org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.VirtualFireAlarmService">
<property name="verificationManager" ref="verificationManagerBean"/> <property name="verificationManager" ref="verificationManagerBean"/>
<property name="virtualFireAlarmMQTTSubscriber" ref="mqttSubscriberBean"/> <property name="virtualFireAlarmMQTTConnector" ref="mqttConnectorBean"/>
<property name="virtualFireAlarmXMPPConnector" ref="xmppConnectorBean"/> <property name="virtualFireAlarmXMPPConnector" ref="xmppConnectorBean"/>
</bean> </bean>
</jaxrs:serviceBeans> </jaxrs:serviceBeans>
@ -55,8 +41,8 @@
<bean id="verificationManagerBean" <bean id="verificationManagerBean"
class="org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.util.VerificationManager"> class="org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.util.VerificationManager">
</bean> </bean>
<bean id="mqttSubscriberBean" <bean id="mqttConnectorBean"
class="org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.transport.VirtualFireAlarmMQTTSubscriber"> class="org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.transport.VirtualFireAlarmMQTTConnector">
</bean> </bean>
<bean id="xmppConnectorBean" <bean id="xmppConnectorBean"
class="org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.transport.VirtualFireAlarmXMPPConnector"> class="org.wso2.carbon.device.mgt.iot.virtualfirealarm.service.transport.VirtualFireAlarmXMPPConnector">

@ -70,8 +70,7 @@ public class DeviceController {
Class<?> dataStoreClass = Class.forName(handlerClass); Class<?> dataStoreClass = Class.forName(handlerClass);
if (DataStoreConnector.class.isAssignableFrom(dataStoreClass)) { if (DataStoreConnector.class.isAssignableFrom(dataStoreClass)) {
DataStoreConnector dataStoreConnector = DataStoreConnector dataStoreConnector = (DataStoreConnector) dataStoreClass.newInstance();
(DataStoreConnector) dataStoreClass.newInstance();
String dataStoreName = dataStore.getName(); String dataStoreName = dataStore.getName();
if (dataStore.isEnabled()) { if (dataStore.isEnabled()) {
dataStoresMap.put(dataStoreName, dataStoreConnector); dataStoresMap.put(dataStoreName, dataStoreConnector);

@ -18,11 +18,6 @@
package org.wso2.carbon.device.mgt.iot.controlqueue.xmpp; package org.wso2.carbon.device.mgt.iot.controlqueue.xmpp;
import java.util.Map;
/**
* Created by smean-MAC on 7/24/15.
*/
public class XmppAccount { public class XmppAccount {
private String username; private String username;
private String password; private String password;

@ -25,6 +25,7 @@ import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse; import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus; import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient; import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity; import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils; import org.apache.http.util.EntityUtils;
@ -33,6 +34,7 @@ import org.wso2.carbon.device.mgt.iot.exception.DeviceControllerException;
import org.wso2.carbon.device.mgt.iot.exception.IoTException; import org.wso2.carbon.device.mgt.iot.exception.IoTException;
import org.wso2.carbon.device.mgt.iot.util.IoTUtil; import org.wso2.carbon.device.mgt.iot.util.IoTUtil;
import javax.ws.rs.core.MediaType;
import java.io.IOException; import java.io.IOException;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException; import java.net.MalformedURLException;
@ -46,7 +48,9 @@ public class XmppServerClient implements ControlQueueConnector {
private static final String XMPP_SERVER_API_CONTEXT = "/plugins/restapi/v1"; private static final String XMPP_SERVER_API_CONTEXT = "/plugins/restapi/v1";
private static final String USERS_API = "/users"; private static final String USERS_API = "/users";
@SuppressWarnings("unused")
private static final String GROUPS_API = "/groups"; private static final String GROUPS_API = "/groups";
@SuppressWarnings("unused")
private static final String APPLICATION_JSON_MT = "application/json"; private static final String APPLICATION_JSON_MT = "application/json";
private String xmppEndpoint; private String xmppEndpoint;
@ -68,9 +72,7 @@ public class XmppServerClient implements ControlQueueConnector {
@Override @Override
public void enqueueControls(HashMap<String, String> deviceControls) public void enqueueControls(HashMap<String, String> deviceControls)
throws DeviceControllerException { throws DeviceControllerException {
if (xmppEnabled) { if (!xmppEnabled) {
} else {
log.warn("XMPP <Enabled> set to false in 'devicemgt-config.xml'"); log.warn("XMPP <Enabled> set to false in 'devicemgt-config.xml'");
} }
} }
@ -79,8 +81,7 @@ public class XmppServerClient implements ControlQueueConnector {
if (xmppEnabled) { if (xmppEnabled) {
String xmppUsersAPIEndpoint = xmppEndpoint + XMPP_SERVER_API_CONTEXT + USERS_API; String xmppUsersAPIEndpoint = xmppEndpoint + XMPP_SERVER_API_CONTEXT + USERS_API;
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("The API Endpoint URL of the XMPP Server is set to: " + log.debug("The API Endpoint URL of the XMPP Server is set to: " + xmppUsersAPIEndpoint);
xmppUsersAPIEndpoint);
} }
String encodedString = xmppUsername + ":" + xmppPassword; String encodedString = xmppUsername + ":" + xmppPassword;
@ -104,22 +105,23 @@ public class XmppServerClient implements ControlQueueConnector {
" ]" + " ]" +
" }" + " }" +
"}"; "}";
StringEntity requestEntity = null;
StringEntity requestEntity;
try { try {
requestEntity = new StringEntity(jsonRequest,"application/json","UTF-8"); requestEntity = new StringEntity(jsonRequest, MediaType.APPLICATION_JSON , StandardCharsets.UTF_8.toString());
} catch (UnsupportedEncodingException e) { } catch (UnsupportedEncodingException e) {
return false; return false;
} }
URL xmppUserApiUrl = null; URL xmppUserApiUrl;
try { try {
xmppUserApiUrl = new URL(xmppUsersAPIEndpoint); xmppUserApiUrl = new URL(xmppUsersAPIEndpoint);
} catch (MalformedURLException e) { } catch (MalformedURLException e) {
String errMsg = "Malformed URL + " + xmppUsersAPIEndpoint; String errMsg = "Malformed XMPP URL + " + xmppUsersAPIEndpoint;
log.error(errMsg); log.error(errMsg);
throw new DeviceControllerException(errMsg); throw new DeviceControllerException(errMsg);
} }
HttpClient httpClient = null; HttpClient httpClient;
try { try {
httpClient = IoTUtil.getHttpClient(xmppUserApiUrl.getPort(), xmppUserApiUrl.getProtocol()); httpClient = IoTUtil.getHttpClient(xmppUserApiUrl.getPort(), xmppUserApiUrl.getProtocol());
} catch (Exception e) { } catch (Exception e) {
@ -127,13 +129,11 @@ public class XmppServerClient implements ControlQueueConnector {
+ xmppUserApiUrl.getProtocol()); + xmppUserApiUrl.getProtocol());
return false; return false;
} }
HttpPost httpPost = new HttpPost(xmppUsersAPIEndpoint);
HttpPost httpPost = new HttpPost(xmppUsersAPIEndpoint);
httpPost.addHeader(HttpHeaders.AUTHORIZATION, authorizationHeader); httpPost.addHeader(HttpHeaders.AUTHORIZATION, authorizationHeader);
httpPost.setEntity(requestEntity); httpPost.setEntity(requestEntity);
try { try {
HttpResponse httpResponse = httpClient.execute(httpPost); HttpResponse httpResponse = httpClient.execute(httpPost);
@ -159,4 +159,69 @@ public class XmppServerClient implements ControlQueueConnector {
return false; return false;
} }
} }
public boolean doesXMPPUserAccountExist(String username) throws DeviceControllerException {
if (xmppEnabled) {
String xmppUsersAPIEndpoint = xmppEndpoint + XMPP_SERVER_API_CONTEXT + USERS_API + "/" + username;
if (log.isDebugEnabled()) {
log.debug("The API Endpoint URL of the XMPP Server is set to: " + xmppUsersAPIEndpoint);
}
String encodedString = xmppUsername + ":" + xmppPassword;
encodedString = new String(Base64.encodeBase64(encodedString.getBytes(StandardCharsets.UTF_8)));
String authorizationHeader = "Basic " + encodedString;
URL xmppUserApiUrl;
try {
xmppUserApiUrl = new URL(xmppUsersAPIEndpoint);
} catch (MalformedURLException e) {
String errMsg = "Malformed XMPP URL + " + xmppUsersAPIEndpoint;
log.error(errMsg);
throw new DeviceControllerException(errMsg, e);
}
HttpClient httpClient;
try {
httpClient = IoTUtil.getHttpClient(xmppUserApiUrl.getPort(), xmppUserApiUrl.getProtocol());
} catch (Exception e) {
String errorMsg = "Error on getting a http client for port :" + xmppUserApiUrl.getPort() +
" protocol :" + xmppUserApiUrl.getProtocol();
log.error(errorMsg);
throw new DeviceControllerException(errorMsg, e);
}
HttpGet httpGet = new HttpGet(xmppUsersAPIEndpoint);
httpGet.addHeader(HttpHeaders.AUTHORIZATION, authorizationHeader);
try {
HttpResponse httpResponse = httpClient.execute(httpGet);
if (httpResponse.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
String response = IoTUtil.getResponseString(httpResponse);
if (log.isDebugEnabled()) {
log.debug("XMPP Server returned status: '" + httpResponse.getStatusLine().getStatusCode() +
"' for checking existence of account [" + username + "] with message:\n" +
response + "\nProbably, an account with this username does not exist.");
}
return false;
}
} catch (IOException | IoTException e) {
String errorMsg = "Error occured whilst trying a 'GET' at : " + xmppUsersAPIEndpoint +
"\nError: " + e.getMessage();
log.error(errorMsg);
throw new DeviceControllerException(errorMsg, e);
}
if (log.isDebugEnabled()) {
log.debug("XMPP Server already has an account for the username - [" + username + "].");
}
return true;
} else {
String warnMsg = "XMPP <Enabled> set to false in 'devicemgt-config.xml'";
log.warn(warnMsg);
throw new DeviceControllerException(warnMsg);
}
}
} }

@ -34,14 +34,17 @@ public interface TransportHandler<T> {
boolean isConnected(); boolean isConnected();
//TODO:: Any errors needs to be thrown ahead void processIncomingMessage() throws TransportHandlerException;
void processIncomingMessage(T message, String... messageParams);
void processIncomingMessage(); void processIncomingMessage(T message) throws TransportHandlerException;
void publishDeviceData(String... publishData); void processIncomingMessage(T message, String... messageParams) throws TransportHandlerException;
void publishDeviceData(); void publishDeviceData() throws TransportHandlerException;
void publishDeviceData(T publishData) throws TransportHandlerException;
void publishDeviceData(String... publishData) throws TransportHandlerException;
void disconnect(); void disconnect();
} }

@ -309,12 +309,17 @@ public abstract class MQTTTransportHandler
@Override @Override
public void messageArrived(final String topic, final MqttMessage mqttMessage) { public void messageArrived(final String topic, final MqttMessage mqttMessage) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.info("Got an MQTT message '" + mqttMessage.toString() + "' for topic '" + topic + "'."); log.debug("Got an MQTT message '" + mqttMessage.toString() + "' for topic '" + topic + "'.");
} }
Thread messageProcessorThread = new Thread() { Thread messageProcessorThread = new Thread() {
public void run() { public void run() {
processIncomingMessage(mqttMessage, topic); try {
processIncomingMessage(mqttMessage, topic);
} catch (TransportHandlerException e) {
log.error("An error occurred when trying to process received MQTT message [" + mqttMessage + "] " +
"for topic [" + topic + "].", e);
}
} }
}; };
messageProcessorThread.setDaemon(true); messageProcessorThread.setDaemon(true);

@ -61,7 +61,6 @@ public abstract class XMPPTransportHandler implements TransportHandler<Message>
private static final int DEFAULT_XMPP_PORT = 5222; private static final int DEFAULT_XMPP_PORT = 5222;
private XMPPConnection connection; private XMPPConnection connection;
private int port; private int port;
private ConnectionConfiguration config;
private PacketFilter filter; private PacketFilter filter;
private PacketListener listener; private PacketListener listener;
@ -71,6 +70,7 @@ public abstract class XMPPTransportHandler implements TransportHandler<Message>
* *
* @param server the IP of the XMPP server. * @param server the IP of the XMPP server.
*/ */
@SuppressWarnings("unused")
protected XMPPTransportHandler(String server) { protected XMPPTransportHandler(String server) {
this.server = server; this.server = server;
this.port = DEFAULT_XMPP_PORT; this.port = DEFAULT_XMPP_PORT;
@ -99,6 +99,7 @@ public abstract class XMPPTransportHandler implements TransportHandler<Message>
* @param port the XMPP server's port to connect to. (default - 5222) * @param port the XMPP server's port to connect to. (default - 5222)
* @param timeoutInterval the timeout interval to use for the connection and reconnection * @param timeoutInterval the timeout interval to use for the connection and reconnection
*/ */
@SuppressWarnings("unused")
protected XMPPTransportHandler(String server, int port, int timeoutInterval) { protected XMPPTransportHandler(String server, int port, int timeoutInterval) {
this.server = server; this.server = server;
this.port = port; this.port = port;
@ -112,6 +113,7 @@ public abstract class XMPPTransportHandler implements TransportHandler<Message>
* @param millis the time in millis to be set as the time-out-limit whilst waiting for a * @param millis the time in millis to be set as the time-out-limit whilst waiting for a
* XMPP-reply. * XMPP-reply.
*/ */
@SuppressWarnings("unused")
public void setTimeoutInterval(int millis) { public void setTimeoutInterval(int millis) {
this.timeoutInterval = millis; this.timeoutInterval = millis;
} }
@ -135,7 +137,7 @@ public abstract class XMPPTransportHandler implements TransportHandler<Message>
log.info(String.format("Initializing connection to XMPP Server at %1$s via port " + log.info(String.format("Initializing connection to XMPP Server at %1$s via port " +
"%2$d.", server, port)); "%2$d.", server, port));
SmackConfiguration.setPacketReplyTimeout(timeoutInterval); SmackConfiguration.setPacketReplyTimeout(timeoutInterval);
config = new ConnectionConfiguration(server, port); ConnectionConfiguration config = new ConnectionConfiguration(server, port);
// TODO:: Need to enable SASL-Authentication appropriately // TODO:: Need to enable SASL-Authentication appropriately
config.setSASLAuthenticationEnabled(false); config.setSASLAuthenticationEnabled(false);
config.setSecurityMode(ConnectionConfiguration.SecurityMode.disabled); config.setSecurityMode(ConnectionConfiguration.SecurityMode.disabled);
@ -214,6 +216,7 @@ public abstract class XMPPTransportHandler implements TransportHandler<Message>
* *
* @param senderJID the JID (XMPP-Account ID of the sender) to which the filter is to be set. * @param senderJID the JID (XMPP-Account ID of the sender) to which the filter is to be set.
*/ */
@SuppressWarnings("unused")
protected void setFilterOnSender(String senderJID) { protected void setFilterOnSender(String senderJID) {
filter = new AndFilter(new PacketTypeFilter(Message.class), new FromContainsFilter( filter = new AndFilter(new PacketTypeFilter(Message.class), new FromContainsFilter(
senderJID)); senderJID));
@ -224,7 +227,12 @@ public abstract class XMPPTransportHandler implements TransportHandler<Message>
final Message xmppMessage = (Message) packet; final Message xmppMessage = (Message) packet;
Thread msgProcessThread = new Thread() { Thread msgProcessThread = new Thread() {
public void run() { public void run() {
processIncomingMessage(xmppMessage); try {
processIncomingMessage(xmppMessage);
} catch (TransportHandlerException e) {
log.error("An error occurred when trying to process received XMPP message " +
"[" + xmppMessage.getBody() + "].", e);
}
} }
}; };
msgProcessThread.setDaemon(true); msgProcessThread.setDaemon(true);
@ -255,7 +263,12 @@ public abstract class XMPPTransportHandler implements TransportHandler<Message>
final Message xmppMessage = (Message) packet; final Message xmppMessage = (Message) packet;
Thread msgProcessThread = new Thread() { Thread msgProcessThread = new Thread() {
public void run() { public void run() {
processIncomingMessage(xmppMessage); try {
processIncomingMessage(xmppMessage);
} catch (TransportHandlerException e) {
log.error("An error occurred when trying to process received XMPP message " +
"[" + xmppMessage.getBody() + "].", e);
}
} }
}; };
msgProcessThread.setDaemon(true); msgProcessThread.setDaemon(true);
@ -280,6 +293,7 @@ public abstract class XMPPTransportHandler implements TransportHandler<Message>
* if false: then the filter is set with 'OR' operator (senderJID | * if false: then the filter is set with 'OR' operator (senderJID |
* receiverJID) * receiverJID)
*/ */
@SuppressWarnings("unused")
protected void setMessageFilterAndListener(String senderJID, String receiverJID, boolean protected void setMessageFilterAndListener(String senderJID, String receiverJID, boolean
andCondition) { andCondition) {
PacketFilter jidFilter; PacketFilter jidFilter;
@ -300,7 +314,12 @@ public abstract class XMPPTransportHandler implements TransportHandler<Message>
final Message xmppMessage = (Message) packet; final Message xmppMessage = (Message) packet;
Thread msgProcessThread = new Thread() { Thread msgProcessThread = new Thread() {
public void run() { public void run() {
processIncomingMessage(xmppMessage); try {
processIncomingMessage(xmppMessage);
} catch (TransportHandlerException e) {
log.error("An error occurred when trying to process received XMPP message " +
"[" + xmppMessage.getBody() + "].", e);
}
} }
}; };
msgProcessThread.setDaemon(true); msgProcessThread.setDaemon(true);
@ -319,6 +338,7 @@ public abstract class XMPPTransportHandler implements TransportHandler<Message>
* @param JID the JID (XMPP Account ID) to which the message is to be sent to. * @param JID the JID (XMPP Account ID) to which the message is to be sent to.
* @param message the XMPP-Message that is to be sent. * @param message the XMPP-Message that is to be sent.
*/ */
@SuppressWarnings("unused")
protected void sendXMPPMessage(String JID, String message) { protected void sendXMPPMessage(String JID, String message) {
sendXMPPMessage(JID, message, "XMPP-Message"); sendXMPPMessage(JID, message, "XMPP-Message");
} }
@ -351,20 +371,11 @@ public abstract class XMPPTransportHandler implements TransportHandler<Message>
protected void sendXMPPMessage(String JID, Message xmppMessage) { protected void sendXMPPMessage(String JID, Message xmppMessage) {
connection.sendPacket(xmppMessage); connection.sendPacket(xmppMessage);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Message: '" + xmppMessage.getBody() + "' sent to XMPP JID [" + JID + log.debug("Message: '" + xmppMessage.getBody() + "' sent to XMPP JID [" + JID + "] sent successfully.");
"] sent successfully.");
} }
} }
/**
* Disables default debugger provided by the XMPPConnection.
*/
protected void disableDebugger() {
connection.DEBUG_ENABLED = false;
}
/** /**
* Closes the connection to the XMPP Server. * Closes the connection to the XMPP Server.
*/ */

Loading…
Cancel
Save