added changes to publish data received via MQTT and XMPP

merge-requests/1/head
Shabirmean 9 years ago
parent 75b4e5ff27
commit e9d08faa6c

@ -28,7 +28,6 @@ xmpp-ep=${XMPP_EP}
auth-method=token
auth-token=${DEVICE_TOKEN}
refresh-token=${DEVICE_REFRESH_TOKEN}
network-interface=wlan0
push-interval=15

@ -10,28 +10,28 @@ echo "----------------------------------------------------------------"
unzip firealarm-virtual-agent-1.0-SNAPSHOT-jar-with-dependencies.jar.zip
while true; do
read -p "What is the network-interface of your device that the Agent should use (find from ifconfig. ex: wlan0,en0,eth0..) > " interface
echo "Setting the network-interface to " $interface
sed s/^network-interface=.*/network-interface=$interface/ deviceConfig.properties > myTmp
mv -f myTmp deviceConfig.properties
break;
done
while true; do
read -p "Whats the time-interval (in seconds) between successive Data-Pushes to the WSO2-IoT-Server (ex: '60' indicates 1 minute) > " interval
if [ $interval -eq $interval 2>/dev/null ]
then
echo "Setting data-push interval to " $interval " seconds."
sed s/^push-interval=.*/push-interval=$interval/ deviceConfig.properties > myTmp
mv -f myTmp deviceConfig.properties
break;
else
echo "Input needs to be an integer indicating the number seconds between successive data-pushes."
fi
done
#while true; do
# read -p "What is the network-interface of your device that the Agent should use (find from ifconfig. ex: wlan0,en0,eth0..) > " interface
#
# echo "Setting the network-interface to " $interface
# sed s/^network-interface=.*/network-interface=$interface/ deviceConfig.properties > myTmp
# mv -f myTmp deviceConfig.properties
# break;
#done
#
#while true; do
# read -p "Whats the time-interval (in seconds) between successive Data-Pushes to the WSO2-IoT-Server (ex: '60' indicates 1 minute) > " interval
#
# if [ $interval -eq $interval 2>/dev/null ]
# then
# echo "Setting data-push interval to " $interval " seconds."
# sed s/^push-interval=.*/push-interval=$interval/ deviceConfig.properties > myTmp
# mv -f myTmp deviceConfig.properties
# break;
# else
# echo "Input needs to be an integer indicating the number seconds between successive data-pushes."
# fi
#done
java -jar wso2-firealarm-virtual-agent.jar

@ -92,7 +92,7 @@ public class VirtualFireAlarmService {
private static Log log = LogFactory.getLog(VirtualFireAlarmService.class);
//TODO; replace this tenant domain
private final String SUPER_TENANT = "carbon.super";
private static final String SUPER_TENANT = "carbon.super";
@Context //injected response proxy supporting multiple thread
private HttpServletResponse response;
@ -579,7 +579,7 @@ public class VirtualFireAlarmService {
break;
case XMPP_PROTOCOL:
replyMsg = sendCommandViaXMPP(owner, deviceId, VirtualFireAlarmConstants.SONAR_CONTEXT, "");
sendCommandViaXMPP(owner, deviceId, VirtualFireAlarmConstants.SONAR_CONTEXT, "");
break;
default:
@ -641,7 +641,7 @@ public class VirtualFireAlarmService {
break;
case XMPP_PROTOCOL:
replyMsg = sendCommandViaXMPP(owner, deviceId, VirtualFireAlarmConstants.TEMPERATURE_CONTEXT, "");
sendCommandViaXMPP(owner, deviceId, VirtualFireAlarmConstants.TEMPERATURE_CONTEXT, "");
break;
default:
@ -685,44 +685,9 @@ public class VirtualFireAlarmService {
return;
}
PrivilegedCarbonContext.startTenantFlow();
PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext();
ctx.setTenantDomain(SUPER_TENANT, true);
DeviceAnalyticsService deviceAnalyticsService = (DeviceAnalyticsService) ctx.getOSGiService(
DeviceAnalyticsService.class, null);
Object metdaData[] = {dataMsg.owner, VirtualFireAlarmConstants.DEVICE_TYPE, dataMsg.deviceId, System.currentTimeMillis()};
Object payloadData[] = {temperature};
try {
deviceAnalyticsService.publishEvent(TEMPERATURE_STREAM_DEFINITION, "1.0.0",
metdaData, new Object[0], payloadData);
} catch (DataPublisherConfigurationException e) {
if (!publishToDAS(dataMsg.owner, dataMsg.deviceId, dataMsg.value)) {
response.setStatus(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
} finally {
PrivilegedCarbonContext.endTenantFlow();
}
}
private String executeCommand(String command) {
StringBuffer output = new StringBuffer();
Process p;
try {
p = Runtime.getRuntime().exec(command);
p.waitFor();
BufferedReader reader =
new BufferedReader(new InputStreamReader(p.getInputStream()));
String line = "";
while ((line = reader.readLine()) != null) {
output.append(line + "\n");
}
} catch (Exception e) {
log.info(e.getMessage(), e);
}
return output.toString();
}
@ -816,66 +781,23 @@ public class VirtualFireAlarmService {
return result;
}
private String sendCommandViaXMPP(String deviceOwner, String deviceId, String resource, String state) throws DeviceManagementException {
private void sendCommandViaXMPP(String deviceOwner, String deviceId, String resource, String state) throws DeviceManagementException {
String replyMsg = "";
// String scriptArguments = "";
// String command = "";
String seperator = File.separator;
String xmppServerURL = XmppConfig.getInstance().getXmppEndpoint();
int indexOfChar = xmppServerURL.lastIndexOf(seperator);
String xmppServerDomain = XmppConfig.getInstance().getXmppEndpoint();
int indexOfChar = xmppServerDomain.lastIndexOf(File.separator);
if (indexOfChar != -1) {
xmppServerURL = xmppServerURL.substring((indexOfChar + 1), xmppServerURL.length());
xmppServerDomain = xmppServerDomain.substring((indexOfChar + 1), xmppServerDomain.length());
}
indexOfChar = xmppServerURL.indexOf(":");
indexOfChar = xmppServerDomain.indexOf(":");
if (indexOfChar != -1) {
xmppServerURL = xmppServerURL.substring(0, indexOfChar);
xmppServerDomain = xmppServerDomain.substring(0, indexOfChar);
}
// String xmppAdminUName = XmppConfig.getInstance().getXmppUsername();
// String xmppAdminPass = XmppConfig.getInstance().getXmppPassword();
// String xmppAdminUserLogin = xmppAdminUName + "@" + xmppServerURL + seperator + deviceOwner;
// String clientToConnect = deviceId + "@" + xmppServerURL + seperator + deviceOwner;
String clientToConnect = deviceId + "@" + xmppServerURL + seperator + deviceOwner;
String clientToConnect = deviceId + "@" + xmppServerDomain + File.separator + deviceOwner;
String message = resource.replace("/","") + ":" + state;
virtualFireAlarmXMPPConnector.sendXMPPMessage(clientToConnect, message, "CONTROL-REQUEST");
// String scriptsFolder = "repository" + seperator + "resources" + seperator + "scripts";
// String scriptPath = CarbonUtils.getCarbonHome() + seperator + scriptsFolder + seperator
// + "xmpp_client.py ";
//
// scriptArguments =
// "-j " + xmppAdminUserLogin + " -p " + xmppAdminPass + " -c " + clientToConnect +
// " -r " + resource + " -s " + state;
// command = "python " + scriptPath + scriptArguments;
//
// if (log.isDebugEnabled()) {
// log.debug("Connecting to XMPP Server via Admin credentials: " + xmppAdminUserLogin);
// log.debug("Trying to contact xmpp device account: " + clientToConnect);
// log.debug("Arguments used for the scripts: '" + scriptArguments + "'");
// log.debug("Command exceuted: '" + command + "'");
// }
// switch (resource) {
// case BULB_CONTEXT:
// scriptArguments = "-r Bulb -s " + state;
// command = "python " + scriptPath + scriptArguments;
// break;
// case SONAR_CONTEXT:
// scriptArguments = "-r Sonar";
// command = "python " + scriptPath + scriptArguments;
// break;
// case TEMPERATURE_CONTEXT:
// scriptArguments = "-r Temperature";
// command = "python " + scriptPath + scriptArguments;
// break;
// }
// replyMsg = executeCommand(command);
return replyMsg;
}
/* ---------------------------------------------------------------------------------------
@ -948,4 +870,23 @@ public class VirtualFireAlarmService {
return completeResponse.toString();
}
public static boolean publishToDAS(String owner, String deviceId, float temperature){
PrivilegedCarbonContext.startTenantFlow();
PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext();
ctx.setTenantDomain(SUPER_TENANT, true);
DeviceAnalyticsService deviceAnalyticsService = (DeviceAnalyticsService) ctx.getOSGiService(
DeviceAnalyticsService.class, null);
Object metdaData[] = {owner, VirtualFireAlarmConstants.DEVICE_TYPE, deviceId, System.currentTimeMillis()};
Object payloadData[] = {temperature};
try {
deviceAnalyticsService.publishEvent(TEMPERATURE_STREAM_DEFINITION, "1.0.0", metdaData, new Object[0], payloadData);
} catch (DataPublisherConfigurationException e) {
return false;
} finally {
PrivilegedCarbonContext.endTenantFlow();
}
return true;
}
}

@ -3,19 +3,23 @@ package org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.uti
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.device.mgt.analytics.exception.DataPublisherConfigurationException;
import org.wso2.carbon.device.mgt.analytics.service.DeviceAnalyticsService;
import org.wso2.carbon.device.mgt.common.DeviceManagementException;
import org.wso2.carbon.device.mgt.iot.common.controlqueue.mqtt.MqttConfig;
import org.wso2.carbon.device.mgt.iot.common.controlqueue.mqtt.MqttSubscriber;
import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.constants
.VirtualFireAlarmConstants;
import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.VirtualFireAlarmService;
import javax.ws.rs.core.Response;
import java.io.File;
import java.util.UUID;
public class VirtualFireAlarmMQTTSubscriber extends MqttSubscriber {
private static Log log = LogFactory.getLog(VirtualFireAlarmMQTTSubscriber.class);
// wso2/iot/shabirmean/virtual_firealarm/t4ctwq8qfl11/publisher
private static final String subscribeTopic =
"wso2" + File.separator + "iot" + File.separator + "+" + File.separator +
VirtualFireAlarmConstants.DEVICE_TYPE + File.separator + "+" + File.separator +
@ -54,9 +58,18 @@ public class VirtualFireAlarmMQTTSubscriber extends MqttSubscriber {
log.info("Received MQTT message for: {OWNER-" + owner + "} & {DEVICE.ID-" + deviceId + "}");
if (message.toString().contains("PUBLISHER")) {
log.info("Received MQTT publisher message [" + message.toString() + "] topic: [" + topic + "]");
log.info("MQTT: Publisher Message [" + message.toString() + "] topic: [" + topic + "]");
float temperature = Float.parseFloat(message.toString().split(":")[2]);
if(!VirtualFireAlarmService.publishToDAS(owner, deviceId, temperature)) {
log.error("MQTT Subscriber: Publishing data to DAS failed.");
}
if(log.isDebugEnabled()) {
log.debug("MQTT Subscriber: Published data to DAS successfully.");
}
} else {
log.info("Received MQTT reply message [" + message.toString() + "] topic: [" + topic + "]");
log.info("MQTT: Reply Message [" + message.toString() + "] topic: [" + topic + "]");
}
}

@ -3,9 +3,15 @@ package org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.uti
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jivesoftware.smack.packet.Message;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.device.mgt.analytics.exception.DataPublisherConfigurationException;
import org.wso2.carbon.device.mgt.analytics.service.DeviceAnalyticsService;
import org.wso2.carbon.device.mgt.common.DeviceManagementException;
import org.wso2.carbon.device.mgt.iot.common.controlqueue.xmpp.XmppConfig;
import org.wso2.carbon.device.mgt.iot.common.controlqueue.xmpp.XmppConnector;
import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.constants
.VirtualFireAlarmConstants;
import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.VirtualFireAlarmService;
public class VirtualFireAlarmXMPPConnector extends XmppConnector {
private static Log log = LogFactory.getLog(VirtualFireAlarmXMPPConnector.class);
@ -53,11 +59,20 @@ public class VirtualFireAlarmXMPPConnector extends XmppConnector {
log.info("Received XMPP message for: {OWNER-" + owner + "} & {DEVICE.ID-" + deviceId + "}");
if (subject.equals("PUBLISHER")) {
log.info("Received XMPP publisher message [" + message + "] from [" + from + "]");
log.info("XMPP: Publisher Message [" + message + "] from [" + from + "]");
float temperature = Float.parseFloat(message.split(":")[1]);
if(!VirtualFireAlarmService.publishToDAS(owner, deviceId, temperature)) {
log.error("XMPP Connector: Publishing data to DAS failed.");
}
if(log.isDebugEnabled()) {
log.debug("XMPP Connector: Published data to DAS successfully.");
}
} else if(subject.equals("CONTROL-REPLY")) {
log.info("Received XMPP reply message [" + message + "] from [" + from + "]");
log.info("XMPP: Reply Message [" + message + "] from [" + from + "]");
} else {
log.info("Received SOME XMPP message [" + message + "] from " + from + "]");
log.info("SOME XMPP Message [" + message + "] from " + from + "]");
}
}
@ -100,6 +115,4 @@ public class VirtualFireAlarmXMPPConnector extends XmppConnector {
retryToConnect.setDaemon(true);
retryToConnect.start();
}
}

Loading…
Cancel
Save