changes to the VirtualFireAlarm service to send and receive XMPP,MQTT messages

application-manager-new
Shabirmean 9 years ago
parent ec47429127
commit 75b4e5ff27

@ -52,6 +52,7 @@ import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util
.VirtualFireAlarmXMPPConnector;
import org.wso2.carbon.utils.CarbonUtils;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
@ -107,18 +108,38 @@ public class VirtualFireAlarmService {
new ConcurrentHashMap<String, String>();
public void setVirtualFireAlarmXMPPConnector(
VirtualFireAlarmXMPPConnector virtualFireAlarmXMPPConnector) {
final VirtualFireAlarmXMPPConnector virtualFireAlarmXMPPConnector) {
this.virtualFireAlarmXMPPConnector = virtualFireAlarmXMPPConnector;
Runnable mqttStarter = new Runnable() {
@Override
public void run() {
virtualFireAlarmXMPPConnector.initConnector();
virtualFireAlarmXMPPConnector.connectAndLogin();
}
};
Thread mqttStarterThread = new Thread(mqttStarter);
mqttStarterThread.setDaemon(true);
mqttStarterThread.start();
}
public void setVirtualFireAlarmMQTTSubscriber(
VirtualFireAlarmMQTTSubscriber virtualFireAlarmMQTTSubscriber) {
final VirtualFireAlarmMQTTSubscriber virtualFireAlarmMQTTSubscriber) {
this.virtualFireAlarmMQTTSubscriber = virtualFireAlarmMQTTSubscriber;
Runnable xmppStarter = new Runnable() {
@Override
public void run() {
virtualFireAlarmMQTTSubscriber.initConnector();
virtualFireAlarmMQTTSubscriber.connectAndSubscribe();
}
};
Thread xmppStarterThread = new Thread(xmppStarter);
xmppStarterThread.setDaemon(true);
xmppStarterThread.start();
}
public VirtualFireAlarmXMPPConnector getVirtualFireAlarmXMPPConnector() {
return virtualFireAlarmXMPPConnector;
@ -128,6 +149,10 @@ public class VirtualFireAlarmService {
return virtualFireAlarmMQTTSubscriber;
}
/* ---------------------------------------------------------------------------------------
Device management specific APIs
Also contains utility methods required for the execution of these APIs
--------------------------------------------------------------------------------------- */
@Path("manager/device/register")
@PUT
public boolean register(@QueryParam("deviceId") String deviceId,
@ -413,18 +438,27 @@ public class VirtualFireAlarmService {
return Long.toString(l, Character.MAX_RADIX);
}
@Path("controller/register/{owner}/{deviceId}/{ip}")
/* ---------------------------------------------------------------------------------------
Device specific APIs - Control APIs + Data-Publishing APIs
Also contains utility methods required for the execution of these APIs
--------------------------------------------------------------------------------------- */
@Path("controller/register/{owner}/{deviceId}/{ip}/{port}")
@POST
public String registerDeviceIP(@PathParam("owner") String owner,
@PathParam("deviceId") String deviceId,
@PathParam("ip") String deviceIP,
@Context HttpServletResponse response) {
@PathParam("port") String devicePort,
@Context HttpServletResponse response,
@Context HttpServletRequest request) {
//TODO:: Need to get IP from the request itself
String result;
log.info("Got register call from IP: " + deviceIP + " for Device ID: " + deviceId +
" of owner: " + owner);
deviceToIpMap.put(deviceId, deviceIP);
String deviceHttpEndpoint = deviceIP + ":" + devicePort;
deviceToIpMap.put(deviceId, deviceHttpEndpoint);
result = "Device-IP Registered";
response.setStatus(Response.Status.OK.getStatusCode());
@ -436,7 +470,6 @@ public class VirtualFireAlarmService {
return result;
}
/* Service to switch "ON" and "OFF" the Virtual FireAlarm bulb
Called by an external client intended to control the Virtual FireAlarm bulb */
@Path("controller/bulb/{state}")
@ -469,45 +502,34 @@ public class VirtualFireAlarmService {
return;
}
String deviceIP = deviceToIpMap.get(deviceId);
if (deviceIP == null) {
response.setStatus(Response.Status.PRECONDITION_FAILED.getStatusCode());
return;
}
String protocolString = protocol.toUpperCase();
String callUrlPattern = VirtualFireAlarmConstants.BULB_CONTEXT + switchToState;
log.info("Sending command: '" + callUrlPattern + "' to virtual-firealarm at: " + deviceIP +
" " +
"via" + " " + protocolString);
log.info("Sending request to switch-bulb of device [" + deviceId + "] via " + protocolString);
try {
switch (protocolString) {
case HTTP_PROTOCOL:
sendCommandViaHTTP(deviceIP, 9090, callUrlPattern, true);
String deviceHTTPEndpoint = deviceToIpMap.get(deviceId);
if (deviceHTTPEndpoint == null) {
response.setStatus(Response.Status.PRECONDITION_FAILED.getStatusCode());
return;
}
sendCommandViaHTTP(deviceHTTPEndpoint, callUrlPattern, true);
break;
case MQTT_PROTOCOL:
sendCommandViaMQTT(owner, deviceId,
VirtualFireAlarmConstants.BULB_CONTEXT.replace("/", ""),
switchToState);
sendCommandViaMQTT(owner, deviceId, VirtualFireAlarmConstants.BULB_CONTEXT.replace("/", ""), switchToState);
break;
case XMPP_PROTOCOL:
sendCommandViaXMPP(owner, deviceId, VirtualFireAlarmConstants.BULB_CONTEXT,
switchToState);
sendCommandViaXMPP(owner, deviceId, VirtualFireAlarmConstants.BULB_CONTEXT, switchToState);
break;
default:
if (protocolString == null) {
sendCommandViaHTTP(deviceIP, 9090, callUrlPattern, true);
} else {
response.setStatus(Response.Status.NOT_ACCEPTABLE.getStatusCode());
return;
}
break;
}
} catch (DeviceManagementException e) {
log.error("Failed to send command '" + callUrlPattern + "' to: " + deviceIP + " via" +
" " + protocol);
log.error("Failed to send switch-bulb request to device [" + deviceId + "] via " + protocolString);
response.setStatus(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
return;
}
@ -526,9 +548,7 @@ public class VirtualFireAlarmService {
DeviceValidator deviceValidator = new DeviceValidator();
try {
if (!deviceValidator.isExist(owner, SUPER_TENANT, new DeviceIdentifier(deviceId,
VirtualFireAlarmConstants
.DEVICE_TYPE))) {
if (!deviceValidator.isExist(owner, SUPER_TENANT, new DeviceIdentifier(deviceId, VirtualFireAlarmConstants.DEVICE_TYPE))) {
response.setStatus(Response.Status.UNAUTHORIZED.getStatusCode());
return "Unauthorized Access";
}
@ -538,52 +558,35 @@ public class VirtualFireAlarmService {
return replyMsg;
}
String deviceIp = deviceToIpMap.get(deviceId);
String protocolString = protocol.toUpperCase();
log.info("Sending request to read sonar value of device [" + deviceId + "] via " + protocolString);
if (deviceIp == null) {
try {
switch (protocolString) {
case HTTP_PROTOCOL:
String deviceHTTPEndpoint = deviceToIpMap.get(deviceId);
if (deviceHTTPEndpoint == null) {
replyMsg = "IP not registered for device: " + deviceId + " of owner: " + owner;
response.setStatus(Response.Status.PRECONDITION_FAILED.getStatusCode());
return replyMsg;
}
try {
switch (protocol) {
case HTTP_PROTOCOL:
log.info("Sending request to read sonar value at : " + deviceIp + " via " +
HTTP_PROTOCOL);
replyMsg = sendCommandViaHTTP(deviceIp, 9090,
VirtualFireAlarmConstants.SONAR_CONTEXT, false);
replyMsg = sendCommandViaHTTP(deviceHTTPEndpoint, VirtualFireAlarmConstants.SONAR_CONTEXT, false);
break;
case MQTT_PROTOCOL:
log.info("Sending request to read sonar value at : " + deviceIp + " via " +
MQTT_PROTOCOL);
sendCommandViaMQTT(owner, deviceId,
VirtualFireAlarmConstants.SONAR_CONTEXT.replace("/", ""),
"");
sendCommandViaMQTT(owner, deviceId, VirtualFireAlarmConstants.SONAR_CONTEXT.replace("/", ""), "");
break;
case XMPP_PROTOCOL:
log.info("Sending request to read sonar value at : " + deviceIp + " via " +
XMPP_PROTOCOL);
replyMsg = sendCommandViaXMPP(owner, deviceId,
VirtualFireAlarmConstants.SONAR_CONTEXT, ".");
replyMsg = sendCommandViaXMPP(owner, deviceId, VirtualFireAlarmConstants.SONAR_CONTEXT, "");
break;
default:
if (protocol == null) {
log.info("Sending request to read sonar value at : " + deviceIp + " via " +
HTTP_PROTOCOL);
replyMsg = sendCommandViaHTTP(deviceIp, 9090,
VirtualFireAlarmConstants.SONAR_CONTEXT,
false);
} else {
replyMsg = "Requested protocol '" + protocol + "' is not supported";
replyMsg = "Requested protocol '" + protocolString + "' is not supported";
response.setStatus(Response.Status.NOT_ACCEPTABLE.getStatusCode());
return replyMsg;
}
break;
}
} catch (DeviceManagementException e) {
replyMsg = e.getErrorMessage();
response.setStatus(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
@ -606,9 +609,7 @@ public class VirtualFireAlarmService {
DeviceValidator deviceValidator = new DeviceValidator();
try {
if (!deviceValidator.isExist(owner, SUPER_TENANT, new DeviceIdentifier(deviceId,
VirtualFireAlarmConstants
.DEVICE_TYPE))) {
if (!deviceValidator.isExist(owner, SUPER_TENANT, new DeviceIdentifier(deviceId, VirtualFireAlarmConstants.DEVICE_TYPE))) {
response.setStatus(Response.Status.UNAUTHORIZED.getStatusCode());
return "Unauthorized Access";
}
@ -618,56 +619,36 @@ public class VirtualFireAlarmService {
return replyMsg;
}
String deviceIp = deviceToIpMap.get(deviceId);
String protocolString = protocol.toUpperCase();
log.info("Sending request to read virtual-firealarm-temperature of device [" + deviceId + "] via " + protocolString);
if (deviceIp == null) {
try {
switch (protocolString) {
case HTTP_PROTOCOL:
String deviceHTTPEndpoint = deviceToIpMap.get(deviceId);
if (deviceHTTPEndpoint == null) {
replyMsg = "IP not registered for device: " + deviceId + " of owner: " + owner;
response.setStatus(Response.Status.PRECONDITION_FAILED.getStatusCode());
return replyMsg;
}
try {
switch (protocol) {
case HTTP_PROTOCOL:
log.info("Sending request to read virtual-firealarm-temperature at : " +
deviceIp + " via " + HTTP_PROTOCOL);
replyMsg = sendCommandViaHTTP(deviceIp, 9090,
VirtualFireAlarmConstants.TEMPERATURE_CONTEXT,
false);
replyMsg = sendCommandViaHTTP(deviceHTTPEndpoint, VirtualFireAlarmConstants.TEMPERATURE_CONTEXT, false);
break;
case MQTT_PROTOCOL:
log.info("Sending request to read virtual-firealarm-temperature at : " +
deviceIp + " via " + MQTT_PROTOCOL);
sendCommandViaMQTT(owner, deviceId,
VirtualFireAlarmConstants.TEMPERATURE_CONTEXT.replace("/",
""),
"");
sendCommandViaMQTT(owner, deviceId, VirtualFireAlarmConstants.TEMPERATURE_CONTEXT.replace("/", ""), "");
break;
case XMPP_PROTOCOL:
log.info("Sending request to read virtual-firealarm-temperature at : " +
deviceIp + " via " + XMPP_PROTOCOL);
replyMsg = sendCommandViaXMPP(owner, deviceId,
VirtualFireAlarmConstants.TEMPERATURE_CONTEXT,
".");
replyMsg = sendCommandViaXMPP(owner, deviceId, VirtualFireAlarmConstants.TEMPERATURE_CONTEXT, "");
break;
default:
if (protocol == null) {
log.info("Sending request to read virtual-firealarm-temperature at : " +
deviceIp + " via " + HTTP_PROTOCOL);
replyMsg = sendCommandViaHTTP(deviceIp, 9090,
VirtualFireAlarmConstants
.TEMPERATURE_CONTEXT,
false);
} else {
replyMsg = "Requested protocol '" + protocol + "' is not supported";
replyMsg = "Requested protocol '" + protocolString + "' is not supported";
response.setStatus(Response.Status.NOT_ACCEPTABLE.getStatusCode());
return replyMsg;
}
break;
}
} catch (DeviceManagementException e) {
replyMsg = e.getErrorMessage();
response.setStatus(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
@ -682,8 +663,7 @@ public class VirtualFireAlarmService {
@Path("controller/push_temperature")
@POST
@Consumes(MediaType.APPLICATION_JSON)
public void pushTemperatureData(
final DeviceJSON dataMsg, @Context HttpServletResponse response) {
public void pushTemperatureData(final DeviceJSON dataMsg, @Context HttpServletResponse response) {
boolean result;
String deviceId = dataMsg.deviceId;
String deviceIp = dataMsg.reply;
@ -708,11 +688,9 @@ public class VirtualFireAlarmService {
PrivilegedCarbonContext.startTenantFlow();
PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext();
ctx.setTenantDomain(SUPER_TENANT, true);
DeviceAnalyticsService deviceAnalyticsService = (DeviceAnalyticsService) ctx
.getOSGiService(DeviceAnalyticsService.class, null);
Object metdaData[] =
{dataMsg.owner, VirtualFireAlarmConstants.DEVICE_TYPE, dataMsg.deviceId,
System.currentTimeMillis()};
DeviceAnalyticsService deviceAnalyticsService = (DeviceAnalyticsService) ctx.getOSGiService(
DeviceAnalyticsService.class, null);
Object metdaData[] = {dataMsg.owner, VirtualFireAlarmConstants.DEVICE_TYPE, dataMsg.deviceId, System.currentTimeMillis()};
Object payloadData[] = {temperature};
try {
deviceAnalyticsService.publishEvent(TEMPERATURE_STREAM_DEFINITION, "1.0.0",
@ -723,151 +701,8 @@ public class VirtualFireAlarmService {
} finally {
PrivilegedCarbonContext.endTenantFlow();
}
}
/* Service to push all the sensor data collected by the FireAlarm
Called by the FireAlarm device */
// @Path("/pushalarmdata")
// @POST
// @Consumes(MediaType.APPLICATION_JSON)
// public void pushAlarmData(final DeviceJSON dataMsg, @Context HttpServletResponse response) {
// boolean result;
// String sensorValues = dataMsg.value;
// log.info("Recieved Sensor Data Values: " + sensorValues);
//
// String sensors[] = sensorValues.split(":");
// try {
// if (sensors.length == 3) {
// String temperature = sensors[0];
// String bulb = sensors[1];
// String sonar = sensors[2];
// sensorValues = "Temperature:" + temperature + "C\tBulb Status:" + bulb +
// "\t\tSonar Status:" + sonar;
// log.info(sensorValues);
// DeviceController deviceController = new DeviceController();
// result = deviceController.pushBamData(dataMsg.owner, FireAlarmConstants
// .DEVICE_TYPE,
// dataMsg.deviceId,
// System.currentTimeMillis(), "DeviceData",
// temperature, "TEMPERATURE");
//
// if (!result) {
// response.setStatus(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
// log.error("Error whilst pushing temperature: " + sensorValues);
// return;
// }
//
// result = deviceController.pushBamData(dataMsg.owner, FireAlarmConstants
// .DEVICE_TYPE,
// dataMsg.deviceId,
// System.currentTimeMillis(), "DeviceData",
// bulb,
// "BULB");
//
// if (!result) {
// response.setStatus(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
// log.error("Error whilst pushing Bulb data: " + sensorValues);
// return;
// }
//
// result = deviceController.pushBamData(dataMsg.owner, FireAlarmConstants
// .DEVICE_TYPE,
// dataMsg.deviceId,
// System.currentTimeMillis(), "DeviceData",
// sonar,
// "SONAR");
//
// if (!result) {
// response.setStatus(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
// log.error("Error whilst pushing Sonar data: " + sensorValues);
// }
//
// } else {
// DeviceController deviceController = new DeviceController();
// result = deviceController.pushBamData(dataMsg.owner, FireAlarmConstants
// .DEVICE_TYPE,
// dataMsg.deviceId,
// System.currentTimeMillis(), "DeviceData",
// dataMsg.value, dataMsg.reply);
// if (!result) {
// response.setStatus(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
// log.error("Error whilst pushing sensor data: " + sensorValues);
// }
// }
//
// } catch (UnauthorizedException e) {
// response.setStatus(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
// log.error("Data Push Attempt Failed at Publisher: " + e.getMessage());
// }
// }
private String sendCommandViaXMPP(String deviceOwner, String deviceId, String resource,
String state) throws DeviceManagementException {
String replyMsg = "";
String scriptArguments = "";
String command = "";
String seperator = File.separator;
String xmppServerURL = XmppConfig.getInstance().getXmppEndpoint();
int indexOfChar = xmppServerURL.lastIndexOf(seperator);
if (indexOfChar != -1) {
xmppServerURL = xmppServerURL.substring((indexOfChar + 1), xmppServerURL.length());
}
indexOfChar = xmppServerURL.indexOf(":");
if (indexOfChar != -1) {
xmppServerURL = xmppServerURL.substring(0, indexOfChar);
}
String xmppAdminUName = XmppConfig.getInstance().getXmppUsername();
String xmppAdminPass = XmppConfig.getInstance().getXmppPassword();
String xmppAdminUserLogin = xmppAdminUName + "@" + xmppServerURL + seperator + deviceOwner;
String clientToConnect = deviceId + "@" + xmppServerURL + seperator + deviceOwner;
String scriptsFolder = "repository" + seperator + "resources" + seperator + "scripts";
String scriptPath = CarbonUtils.getCarbonHome() + seperator + scriptsFolder + seperator
+ "xmpp_client.py ";
scriptArguments =
"-j " + xmppAdminUserLogin + " -p " + xmppAdminPass + " -c " + clientToConnect +
" -r " + resource + " -s " + state;
command = "python " + scriptPath + scriptArguments;
if (log.isDebugEnabled()) {
log.debug("Connecting to XMPP Server via Admin credentials: " + xmppAdminUserLogin);
log.debug("Trying to contact xmpp device account: " + clientToConnect);
log.debug("Arguments used for the scripts: '" + scriptArguments + "'");
log.debug("Command exceuted: '" + command + "'");
}
// switch (resource) {
// case BULB_CONTEXT:
// scriptArguments = "-r Bulb -s " + state;
// command = "python " + scriptPath + scriptArguments;
// break;
// case SONAR_CONTEXT:
// scriptArguments = "-r Sonar";
// command = "python " + scriptPath + scriptArguments;
// break;
// case TEMPERATURE_CONTEXT:
// scriptArguments = "-r Temperature";
// command = "python " + scriptPath + scriptArguments;
// break;
// }
replyMsg = executeCommand(command);
return replyMsg;
}
private String executeCommand(String command) {
StringBuffer output = new StringBuffer();
@ -891,39 +726,10 @@ public class VirtualFireAlarmService {
}
private boolean sendCommandViaMQTT(String deviceOwner, String deviceId, String resource,
String state) throws DeviceManagementException {
boolean result = false;
DeviceController deviceController = new DeviceController();
try {
result = deviceController.publishMqttControl(deviceOwner,
VirtualFireAlarmConstants.DEVICE_TYPE,
deviceId, resource, state);
} catch (DeviceControllerException e) {
String errorMsg = "Error whilst trying to publish to MQTT Queue";
log.error(errorMsg);
throw new DeviceManagementException(errorMsg, e);
}
return result;
}
private String sendCommandViaHTTP(final String deviceIp, int deviceServerPort,
String callUrlPattern,
boolean fireAndForgot)
throws DeviceManagementException {
if (deviceServerPort == 0) {
deviceServerPort = 9090;
}
private String sendCommandViaHTTP(final String deviceHTTPEndpoint, String urlContext, boolean fireAndForgot) throws DeviceManagementException {
String responseMsg = "";
String urlString =
VirtualFireAlarmConstants.URL_PREFIX + deviceIp + ":" + deviceServerPort +
callUrlPattern;
String urlString = VirtualFireAlarmConstants.URL_PREFIX + deviceHTTPEndpoint + urlContext;
if (log.isDebugEnabled()) {
log.debug(urlString);
@ -988,13 +794,93 @@ public class VirtualFireAlarmService {
}
}
}
}
return responseMsg;
}
/* Utility methods relevant to creating and sending http requests */
private boolean sendCommandViaMQTT(String deviceOwner, String deviceId, String resource, String state) throws DeviceManagementException {
boolean result = false;
DeviceController deviceController = new DeviceController();
try {
result = deviceController.publishMqttControl(deviceOwner,
VirtualFireAlarmConstants.DEVICE_TYPE,
deviceId, resource, state);
} catch (DeviceControllerException e) {
String errorMsg = "Error whilst trying to publish to MQTT Queue";
log.error(errorMsg);
throw new DeviceManagementException(errorMsg, e);
}
return result;
}
private String sendCommandViaXMPP(String deviceOwner, String deviceId, String resource, String state) throws DeviceManagementException {
String replyMsg = "";
// String scriptArguments = "";
// String command = "";
String seperator = File.separator;
String xmppServerURL = XmppConfig.getInstance().getXmppEndpoint();
int indexOfChar = xmppServerURL.lastIndexOf(seperator);
if (indexOfChar != -1) {
xmppServerURL = xmppServerURL.substring((indexOfChar + 1), xmppServerURL.length());
}
indexOfChar = xmppServerURL.indexOf(":");
if (indexOfChar != -1) {
xmppServerURL = xmppServerURL.substring(0, indexOfChar);
}
// String xmppAdminUName = XmppConfig.getInstance().getXmppUsername();
// String xmppAdminPass = XmppConfig.getInstance().getXmppPassword();
// String xmppAdminUserLogin = xmppAdminUName + "@" + xmppServerURL + seperator + deviceOwner;
// String clientToConnect = deviceId + "@" + xmppServerURL + seperator + deviceOwner;
String clientToConnect = deviceId + "@" + xmppServerURL + seperator + deviceOwner;
String message = resource.replace("/","") + ":" + state;
virtualFireAlarmXMPPConnector.sendXMPPMessage(clientToConnect, message, "CONTROL-REQUEST");
// String scriptsFolder = "repository" + seperator + "resources" + seperator + "scripts";
// String scriptPath = CarbonUtils.getCarbonHome() + seperator + scriptsFolder + seperator
// + "xmpp_client.py ";
//
// scriptArguments =
// "-j " + xmppAdminUserLogin + " -p " + xmppAdminPass + " -c " + clientToConnect +
// " -r " + resource + " -s " + state;
// command = "python " + scriptPath + scriptArguments;
//
// if (log.isDebugEnabled()) {
// log.debug("Connecting to XMPP Server via Admin credentials: " + xmppAdminUserLogin);
// log.debug("Trying to contact xmpp device account: " + clientToConnect);
// log.debug("Arguments used for the scripts: '" + scriptArguments + "'");
// log.debug("Command exceuted: '" + command + "'");
// }
// switch (resource) {
// case BULB_CONTEXT:
// scriptArguments = "-r Bulb -s " + state;
// command = "python " + scriptPath + scriptArguments;
// break;
// case SONAR_CONTEXT:
// scriptArguments = "-r Sonar";
// command = "python " + scriptPath + scriptArguments;
// break;
// case TEMPERATURE_CONTEXT:
// scriptArguments = "-r Temperature";
// command = "python " + scriptPath + scriptArguments;
// break;
// }
// replyMsg = executeCommand(command);
return replyMsg;
}
/* ---------------------------------------------------------------------------------------
Utility methods relevant to creating and sending http requests
--------------------------------------------------------------------------------------- */
/* This methods creates and returns a http connection object */

@ -15,10 +15,12 @@ import java.util.UUID;
public class VirtualFireAlarmMQTTSubscriber extends MqttSubscriber {
private static Log log = LogFactory.getLog(VirtualFireAlarmMQTTSubscriber.class);
// wso2/iot/shabirmean/virtual_firealarm/t4ctwq8qfl11/publisher
private static final String subscribeTopic =
"wso2" + File.separator + "iot" + File.separator + "+" + File.separator +
VirtualFireAlarmConstants.DEVICE_TYPE + File.separator + "+" + File.separator +
"reply";
"publisher";
private static final String iotServerSubscriber = UUID.randomUUID().toString().substring(0, 5);
private static String mqttEndpoint;
@ -42,7 +44,20 @@ public class VirtualFireAlarmMQTTSubscriber extends MqttSubscriber {
@Override
protected void postMessageArrived(String topic, MqttMessage message) {
log.info("Message " + message.toString() + " was received for topic: " + topic);
String ownerAndId = topic.replace("wso2" + File.separator + "iot" + File.separator, "");
ownerAndId = ownerAndId.replace(File.separator + VirtualFireAlarmConstants.DEVICE_TYPE + File.separator, ":");
ownerAndId = ownerAndId.replace(File.separator + "publisher", "");
String owner = ownerAndId.split(":")[0];
String deviceId = ownerAndId.split(":")[1];
log.info("Received MQTT message for: {OWNER-" + owner + "} & {DEVICE.ID-" + deviceId + "}");
if (message.toString().contains("PUBLISHER")) {
log.info("Received MQTT publisher message [" + message.toString() + "] topic: [" + topic + "]");
} else {
log.info("Received MQTT reply message [" + message.toString() + "] topic: [" + topic + "]");
}
}
private void retryMQTTSubscription() {

@ -41,8 +41,25 @@ public class VirtualFireAlarmXMPPConnector extends XmppConnector {
@Override
protected void processXMPPMessage(Message xmppMessage) {
String from = xmppMessage.getFrom();
String subject = xmppMessage.getSubject();
String message = xmppMessage.getBody();
log.info("Received XMPP message '" + message + "' from " + from);
int indexOfAt = from.indexOf("@");
int indexOfSlash = from.indexOf("/");
String deviceId = from.substring(0, indexOfAt);
String owner = from.substring(indexOfSlash + 1, from.length());
log.info("Received XMPP message for: {OWNER-" + owner + "} & {DEVICE.ID-" + deviceId + "}");
if (subject.equals("PUBLISHER")) {
log.info("Received XMPP publisher message [" + message + "] from [" + from + "]");
} else if(subject.equals("CONTROL-REPLY")) {
log.info("Received XMPP reply message [" + message + "] from [" + from + "]");
} else {
log.info("Received SOME XMPP message [" + message + "] from " + from + "]");
}
}
private void retryXMPPConnection() {

Loading…
Cancel
Save