changes to implement XMPP communication

application-manager-new
Shabir Mohamed 10 years ago
parent 570a8ba0ba
commit 0c6bc2e400

@ -28,11 +28,12 @@ import org.wso2.carbon.device.mgt.common.DeviceIdentifier;
import org.wso2.carbon.device.mgt.common.DeviceManagementException;
import org.wso2.carbon.device.mgt.iot.common.DeviceController;
import org.wso2.carbon.device.mgt.iot.common.DeviceValidator;
import org.wso2.carbon.device.mgt.iot.common.controlqueue.xmpp.XmppConfig;
import org.wso2.carbon.device.mgt.iot.common.datastore.impl.DataStreamDefinitions;
import org.wso2.carbon.device.mgt.iot.common.exception.DeviceControllerException;
import org.wso2.carbon.device.mgt.iot.common.exception.UnauthorizedException;
import org.wso2.carbon.device.mgt.iot.sample.firealarm.service.impl.util.DeviceJSON;
import org.wso2.carbon.device.mgt.iot.sample.firealarm.plugin.constants.FireAlarmConstants;
import org.wso2.carbon.device.mgt.iot.sample.firealarm.service.impl.util.DeviceJSON;
import org.wso2.carbon.utils.CarbonUtils;
import javax.servlet.http.HttpServletResponse;
@ -63,8 +64,8 @@ public class FireAlarmControllerService {
private static final String URL_PREFIX = "http://";
private static final String BULB_CONTEXT = "/BULB/";
private static final String FAN_CONTEXT = "/FAN/";
private static final String TEMPERATURE_CONTEXT = "/TEMP/";
private static final String SONAR_CONTEXT = "/SONAR/";
private static final String TEMPERATURE_CONTEXT = "/TEMPERATURE/";
public static final String XMPP_PROTOCOL = "XMPP";
public static final String HTTP_PROTOCOL = "HTTP";
@ -149,8 +150,12 @@ public class FireAlarmControllerService {
sendCommandViaHTTP(deviceIP, 80, callUrlPattern, true);
break;
case MQTT_PROTOCOL:
callUrlPattern = BULB_CONTEXT.replace("/", "");
sendCommandViaMQTT(owner, deviceId, callUrlPattern, switchToState);
sendCommandViaMQTT(owner, deviceId, BULB_CONTEXT.replace("/", ""),
switchToState);
break;
case XMPP_PROTOCOL:
// requestBulbChangeViaXMPP(switchToState, response);
sendCommandViaXMPP(owner, deviceId, BULB_CONTEXT, switchToState);
break;
default:
if (protocolString == null) {
@ -172,76 +177,74 @@ public class FireAlarmControllerService {
}
@Path("/fan/{state}")
@POST
public void switchFan(@HeaderParam("owner") String owner,
@HeaderParam("deviceId") String deviceId,
@HeaderParam("protocol") String protocol,
@PathParam("state") String state,
@Context HttpServletResponse response) {
@Path("/readsonar")
@GET
public String requestSonarReading(@HeaderParam("owner") String owner,
@HeaderParam("deviceId") String deviceId,
@HeaderParam("protocol") String protocol,
@Context HttpServletResponse response) {
String replyMsg = "";
DeviceValidator deviceValidator = new DeviceValidator();
try {
DeviceValidator deviceValidator = new DeviceValidator();
if (!deviceValidator.isExist(owner, new DeviceIdentifier(deviceId,
FireAlarmConstants
.DEVICE_TYPE))) {
response.setStatus(HttpStatus.SC_UNAUTHORIZED);
return;
return "Unauthorized Access";
}
} catch (DeviceManagementException e) {
log.error("DeviceValidation Failed for deviceId: " + deviceId + " of user: " + owner);
replyMsg = e.getErrorMessage();
response.setStatus(HttpStatus.SC_INTERNAL_SERVER_ERROR);
return;
}
String switchToState = state.toUpperCase();
if (!switchToState.equals(FireAlarmConstants.STATE_ON) && !switchToState.equals(
FireAlarmConstants.STATE_OFF)) {
log.error("The requested state change shoud be either - 'ON' or 'OFF'");
response.setStatus(HttpStatus.SC_BAD_REQUEST);
return;
return replyMsg;
}
String deviceIP = deviceToIpMap.get(deviceId);
String deviceIp = deviceToIpMap.get(deviceId);
if (deviceIP == null) {
if (deviceIp == null) {
replyMsg = "IP not registered for device: " + deviceId + " of owner: " + owner;
response.setStatus(HttpStatus.SC_PRECONDITION_FAILED);
return;
return replyMsg;
}
String protocolString = protocol.toUpperCase();
String callUrlPattern = FAN_CONTEXT + switchToState;
log.info("Sending command: '" + callUrlPattern + "' to firealarm at: " + deviceIP + " " +
"via" + " " + protocol);
try {
switch (protocolString) {
switch (protocol) {
case HTTP_PROTOCOL:
sendCommandViaHTTP(deviceIP, 80, callUrlPattern, true);
log.info("Sending request to read sonar value at : " + deviceIp +
" via " + HTTP_PROTOCOL);
replyMsg = sendCommandViaHTTP(deviceIp, 80, SONAR_CONTEXT, false);
break;
case MQTT_PROTOCOL:
callUrlPattern = FAN_CONTEXT.replace("/", "");
sendCommandViaMQTT(owner, deviceId, callUrlPattern, switchToState);
case XMPP_PROTOCOL:
log.info("Sending request to read sonar value at : " + deviceIp +
" via " +
XMPP_PROTOCOL);
replyMsg = sendCommandViaXMPP(owner, deviceId, SONAR_CONTEXT, ".");
break;
default:
if (protocolString == null) {
sendCommandViaHTTP(deviceIP, 80, callUrlPattern, true);
if (protocol == null) {
log.info("Sending request to read sonar value at : " + deviceIp +
" via " + HTTP_PROTOCOL);
replyMsg = sendCommandViaHTTP(deviceIp, 80, SONAR_CONTEXT, false);
} else {
replyMsg = "Requested protocol '" + protocol + "' is not supported";
response.setStatus(HttpStatus.SC_NOT_IMPLEMENTED);
return;
return replyMsg;
}
break;
}
} catch (DeviceManagementException e) {
log.error("Failed to send command '" + callUrlPattern + "' to: " + deviceIP + " via" +
" " + protocol);
replyMsg = e.getErrorMessage();
response.setStatus(HttpStatus.SC_INTERNAL_SERVER_ERROR);
return;
return replyMsg;
}
response.setStatus(HttpStatus.SC_OK);
replyMsg = "The current sonar reading of the device is " + replyMsg;
return replyMsg;
}
@ -288,7 +291,8 @@ public class FireAlarmControllerService {
log.info("Sending request to read firealarm-temperature at : " + deviceIp +
" via " +
XMPP_PROTOCOL);
replyMsg = requestTemperatureViaXMPP(deviceIp, response);
replyMsg = sendCommandViaXMPP(owner, deviceId, TEMPERATURE_CONTEXT, ".");
// replyMsg = requestTemperatureViaXMPP(response);
break;
default:
@ -316,46 +320,53 @@ public class FireAlarmControllerService {
}
public String requestTemperatureViaXMPP(String deviceIp,
@Context HttpServletResponse response) {
String replyMsg = "";
String sep = File.separator;
String scriptsFolder = "repository" + sep + "resources" + sep + "scripts";
String scriptPath = CarbonUtils.getCarbonHome() + sep + scriptsFolder + sep
+ "xmpp_client.py";
String command = "python " + scriptPath;
replyMsg = executeCommand(command);
response.setStatus(HttpStatus.SC_OK);
return replyMsg;
}
private String executeCommand(String command) {
StringBuffer output = new StringBuffer();
Process p;
try {
p = Runtime.getRuntime().exec(command);
p.waitFor();
BufferedReader reader =
new BufferedReader(new InputStreamReader(p.getInputStream()));
String line = "";
while ((line = reader.readLine()) != null) {
output.append(line + "\n");
}
} catch (Exception e) {
log.info(e.getMessage(), e);
}
return output.toString();
}
// public String requestTemperatureViaXMPP(@Context HttpServletResponse response) {
// String replyMsg = "";
//
// String sep = File.separator;
// String scriptsFolder = "repository" + sep + "resources" + sep + "scripts";
// String scriptPath = CarbonUtils.getCarbonHome() + sep + scriptsFolder + sep
// + "xmpp_client.py -r Temperature";
// String command = "python " + scriptPath;
//
// replyMsg = executeCommand(command);
//
// response.setStatus(HttpStatus.SC_OK);
// return replyMsg;
// }
// public String requestSonarViaXMPP(@Context HttpServletResponse response) {
// String replyMsg = "";
//
// String sep = File.separator;
// String scriptsFolder = "repository" + sep + "resources" + sep + "scripts";
// String scriptPath = CarbonUtils.getCarbonHome() + sep + scriptsFolder + sep
// + "xmpp_client.py -r Sonar";
// String command = "python " + scriptPath;
//
// replyMsg = executeCommand(command);
//
// response.setStatus(HttpStatus.SC_OK);
// return replyMsg;
// }
// public String requestBulbChangeViaXMPP(String state,
// @Context HttpServletResponse response) {
// String replyMsg = "";
//
// String sep = File.separator;
// String scriptsFolder = "repository" + sep + "resources" + sep + "scripts";
// String scriptPath = CarbonUtils.getCarbonHome() + sep + scriptsFolder + sep
// + "xmpp_client.py -r Bulb -s " + state;
// String command = "python " + scriptPath;
//
// replyMsg = executeCommand(command);
//
// response.setStatus(HttpStatus.SC_OK);
// return replyMsg;
// }
@Path("/push_temperature")
@POST
@ -441,10 +452,10 @@ public class FireAlarmControllerService {
if (sensors.length == 3) {
String temperature = sensors[0];
String bulb = sensors[1];
String fan = sensors[2];
String sonar = sensors[2];
sensorValues = "Temperature:" + temperature + "C\tBulb Status:" + bulb +
"\t\tFan Status:" + fan;
"\t\tSonar Status:" + sonar;
log.info(sensorValues);
DeviceController deviceController = new DeviceController();
result = deviceController.pushBamData(dataMsg.owner, FireAlarmConstants
@ -476,12 +487,12 @@ public class FireAlarmControllerService {
.DEVICE_TYPE,
dataMsg.deviceId,
System.currentTimeMillis(), "DeviceData",
fan,
"FAN");
sonar,
"SONAR");
if (!result) {
response.setStatus(HttpStatus.SC_INTERNAL_SERVER_ERROR);
log.error("Error whilst pushing Fan data: " + sensorValues);
log.error("Error whilst pushing Sonar data: " + sensorValues);
}
} else {
@ -504,6 +515,89 @@ public class FireAlarmControllerService {
}
private String sendCommandViaXMPP(String deviceOwner, String deviceId, String resource,
String state) throws DeviceManagementException {
String replyMsg = "";
String scriptArguments = "";
String command = "";
String seperator = File.separator;
String xmppServerURL = XmppConfig.getInstance().getXmppEndpoint();
int indexOfChar = xmppServerURL.lastIndexOf(seperator);
if (indexOfChar != -1) {
xmppServerURL = xmppServerURL.substring((indexOfChar + 1), xmppServerURL.length());
}
indexOfChar = xmppServerURL.indexOf(":");
if (indexOfChar != -1) {
xmppServerURL = xmppServerURL.substring(0, indexOfChar);
}
String xmppAdminUName = XmppConfig.getInstance().getXmppUsername();
String xmppAdminPass = XmppConfig.getInstance().getXmppPassword();
String xmppAdminUserLogin = xmppAdminUName + "@" + xmppServerURL + seperator + deviceOwner;
String clientToConnect = deviceId + "@" + xmppServerURL + seperator + deviceOwner;
String scriptsFolder = "repository" + seperator + "resources" + seperator + "scripts";
String scriptPath = CarbonUtils.getCarbonHome() + seperator + scriptsFolder + seperator
+ "xmpp_client.py ";
scriptArguments = "-j " + xmppAdminUserLogin + " -p " + xmppAdminPass + " -c " + clientToConnect + " -r " + resource + " -s " + state;
command = "python " + scriptPath + scriptArguments;
if (log.isDebugEnabled()){
log.debug("Connecting to XMPP Server via Admin credentials: " + xmppAdminUserLogin);
log.debug("Trying to contact xmpp device account: " + clientToConnect);
log.debug("Arguments used for the scripts: '" + scriptArguments + "'");
log.debug("Command exceuted: '" + command + "'");
}
// switch (resource) {
// case BULB_CONTEXT:
// scriptArguments = "-r Bulb -s " + state;
// command = "python " + scriptPath + scriptArguments;
// break;
// case SONAR_CONTEXT:
// scriptArguments = "-r Sonar";
// command = "python " + scriptPath + scriptArguments;
// break;
// case TEMPERATURE_CONTEXT:
// scriptArguments = "-r Temperature";
// command = "python " + scriptPath + scriptArguments;
// break;
// }
replyMsg = executeCommand(command);
return replyMsg;
}
private String executeCommand(String command) {
StringBuffer output = new StringBuffer();
Process p;
try {
p = Runtime.getRuntime().exec(command);
p.waitFor();
BufferedReader reader =
new BufferedReader(new InputStreamReader(p.getInputStream()));
String line = "";
while ((line = reader.readLine()) != null) {
output.append(line + "\n");
}
} catch (Exception e) {
log.info(e.getMessage(), e);
}
return output.toString();
}
private boolean sendCommandViaMQTT(String deviceOwner, String deviceId, String resource,
String state) throws DeviceManagementException {

@ -18,7 +18,6 @@
* limitations under the License.
*/
"""
import getpass
import logging
import sys
from optparse import OptionParser
@ -34,6 +33,7 @@ from sleekxmpp.exceptions import IqError, IqTimeout
# ourselves to UTF-8.
if sys.version_info < (3, 0):
from sleekxmpp.util.misc_ops import setdefaultencoding
setdefaultencoding('utf8')
else:
raw_input = input
@ -41,6 +41,7 @@ else:
# from sleekxmpp.plugins.xep_0323.device import Device
PRINT_HEADER_LENGTH = 40
class IoT_TestDevice(sleekxmpp.ClientXMPP):
"""
A simple IoT device that can act as client
@ -53,15 +54,17 @@ class IoT_TestDevice(sleekxmpp.ClientXMPP):
python xmpp_client.py -j "bob@yourdomain.com" -p "password" -c "alice@yourdomain.com/device1" {--[debug|quiet]}
python xmpp_client.py -j "bob@127.0.0.1" -p "password" -c "alice@127.0.0.1/device1" {--[debug|quiet]}
"""
def __init__(self, jid, password, sensorjid):
def __init__(self, jid, password, sensorjid, sensorType):
sleekxmpp.ClientXMPP.__init__(self, jid, password)
self.add_event_handler("session_start", self.session_start)
self.add_event_handler("message", self.message)
self.device=None
self.releaseMe=False
self.device = None
self.releaseMe = False
self.target_jid = sensorjid
#self.beServer=True
#self.clientJID=None
self.requestType = sensorType
# self.beServer=True
# self.clientJID=None
def datacallback(self, from_jid, result, nodeId=None, timestamp=None, fields=None,
error_msg=None):
@ -70,24 +73,28 @@ class IoT_TestDevice(sleekxmpp.ClientXMPP):
se script below for the registration of the callback
"""
logging.debug("we got data %s from %s", str(result), from_jid)
if(result=="fields"):
if (result == "fields"):
header = 'XEP 302 Sensor Data'
logging.info('-' * PRINT_HEADER_LENGTH)
gap = ' '* ((PRINT_HEADER_LENGTH - len(header)) / 2)
gap = ' ' * ((PRINT_HEADER_LENGTH - len(header)) / 2)
logging.info(gap + header)
logging.info('-' * PRINT_HEADER_LENGTH)
logging.debug("RECV:"+str(fields))
logging.debug("RECV:" + str(fields))
if len(fields) > 0:
logging.info("Name\t\tType\tValue\tUnit")
for field in fields:
logging.info(" - " + field["name"] + "\t" + field["typename"] + "\t" + field["value"] + "\t" + field["unit"])
logging.info(" - " + field["name"] + "\t" + field["typename"] + "\t" + field[
"value"] + "\t" + field["unit"])
if self.requestType in ("/" + field["name"].upper() + "/."):
print field["value"]
logging.info('-' * PRINT_HEADER_LENGTH)
self.disconnect()
print field["value"]
def testForRelease(self):
# todo thread safe
@ -95,20 +102,20 @@ class IoT_TestDevice(sleekxmpp.ClientXMPP):
def doReleaseMe(self):
# todo thread safe
self.releaseMe=True
self.releaseMe = True
def addDevice(self, device):
self.device=device
self.device = device
def session_start(self, event):
self.send_presence()
self.get_roster()
# tell your preffered friend that you are alive using generic xmpp chat protocol
#self.send_message(mto='jocke@jabber.sust.se', mbody=self.boundjid.bare +' is now online use xep_323 stanza to talk to me')
# self.send_message(mto='jocke@jabber.sust.se', mbody=self.boundjid.bare +' is now online use xep_323 stanza to talk to me')
#-------------------------------------------------------------------------------------------
# -------------------------------------------------------------------------------------------
# Service Discovery
#-------------------------------------------------------------------------------------------
# -------------------------------------------------------------------------------------------
try:
# By using block=True, the result stanza will be
# returned. Execution will block until the reply is
@ -125,7 +132,7 @@ class IoT_TestDevice(sleekxmpp.ClientXMPP):
else:
header = 'XMPP Service Discovery'
logging.info('-' * PRINT_HEADER_LENGTH)
gap = ' '* ((PRINT_HEADER_LENGTH - len(header)) / 2)
gap = ' ' * ((PRINT_HEADER_LENGTH - len(header)) / 2)
logging.info(gap + header)
logging.info('-' * PRINT_HEADER_LENGTH)
@ -134,41 +141,35 @@ class IoT_TestDevice(sleekxmpp.ClientXMPP):
for feature in info['disco_info']['features']:
logging.info(' - %s' % feature)
#-------------------------------------------------------------------------------------------
# -------------------------------------------------------------------------------------------
# Requesting data through XEP0323
#-------------------------------------------------------------------------------------------
session = self['xep_0323'].request_data(self.boundjid.full, self.target_jid,
self.datacallback, flags={"momentary": "true"})
# -------------------------------------------------------------------------------------------
logging.info('-' * PRINT_HEADER_LENGTH)
logging.info("Sending Request: %s to %s", self.requestType, self.target_jid)
if self.requestType in ('/TEMPERATURE/.', '/SONAR/.'):
session = self['xep_0323'].request_data(self.boundjid.full, self.target_jid,
self.datacallback, flags={"momentary": "true"})
else:
self.send_message(mto=self.target_jid,
mbody=self.requestType,
mtype='chat')
# Using wait=True ensures that the send queue will be
# emptied before ending the session.
self.disconnect(wait=True)
print ("Bulb state switched - " + self.requestType)
def message(self, msg):
if msg['type'] in ('chat', 'normal'):
logging.debug("got normal chat message" + str(msg))
logging.info("got normal chat message" + str(msg))
ipPublic = urlopen('http://icanhazip.com').read()
ipSocket = socket.gethostbyname(socket.gethostname())
msg.reply("Hi I am " + self.boundjid.full + " and I am on IP " + ipSocket + " use xep_323 stanza to talk to me").send()
msg.reply(
"Hi I am " + self.boundjid.full + " and I am on IP " + ipSocket + " use xep_323 stanza to talk to me").send()
else:
logging.debug("got unknown message type %s", str(msg['type']))
# class TheDevice(Device):
# """
# This is the actual device object that you will use to get information from your real hardware
# You will be called in the refresh method when someone is requesting information from you
# """
# def __init__(self,nodeId):
# Device.__init__(self,nodeId)
# self.counter=0
#
# def refresh(self,fields):
# """
# the implementation of the refresh method
# """
# #self._set_momentary_timestamp(self._get_timestamp())
# self.counter+=1
# #self._add_field_momentary_data(self, "Temperature", self.counter)
#
# self._add_field(name="Temperature", typename="numeric", unit="C")
# self._set_momentary_timestamp(self._get_timestamp())
# self._add_field_momentary_data("Temperature", str(self.counter), flags={"automaticReadout": "true"})
logging.info("got unknown message type %s", str(msg['type']))
if __name__ == '__main__':
@ -206,36 +207,42 @@ if __name__ == '__main__':
optp.add_option("-c", "--sensorjid", dest="sensorjid",
help="Another device to call for data on", default=None)
optp.add_option("-r", "--sensorType", dest="sensorType",
help="The type of the sensor info requested", default="/TEMPERATURE/")
optp.add_option("-s", "--sensorState", dest="sensorState",
help="The state of the sensor to switch to", default="")
opts, args = optp.parse_args()
# Setup logging.
# Setup logging.
logging.basicConfig(level=opts.loglevel,
format='%(levelname)-8s %(message)s')
if opts.jid is None:
#opts.jid = raw_input("Username: ")
opts.jid = "admin@204.232.188.215/raspi"
opts.jid = "admin@204.232.188.215/admin"
if opts.password is None:
opts.password = "wso2iot123"
#opts.password = getpass.getpass("Password: ")
if opts.sensorjid is None:
opts.sensorjid = "1hrdqr4r2ymhy@204.232.188.215/raspi"
#opts.sensorjid = getpass.getpass("Sensor JID: ")
opts.password = "admin"
# if opts.sensorjid is None:
# opts.sensorjid = "t4ibkqs8t7ox@204.232.188.215/admin"
#-------------------------------------------------------------------------------------------
# -------------------------------------------------------------------------------------------
# Starting XMPP with XEP0030, XEP0323, XEP0325
#-------------------------------------------------------------------------------------------
xmpp = IoT_TestDevice(opts.jid, opts.password, opts.sensorjid)
# -------------------------------------------------------------------------------------------
requestState = opts.sensorType + opts.sensorState
xmpp = IoT_TestDevice(opts.jid, opts.password, opts.sensorjid, requestState)
xmpp.register_plugin('xep_0030')
xmpp.register_plugin('xep_0323')
xmpp.register_plugin('xep_0325')
if opts.sensorjid:
logging.debug("will try to call another device for data")
logging.debug("Will try to call another device for data")
# xmpp.beClientOrServer(server=False,clientJID=opts.sensorjid)
xmpp.connect()
xmpp.process(block=True)
logging.debug("ready ending")
else:
print "noopp didn't happen"
print "ID of the client to communicate-to not given..."

Loading…
Cancel
Save