From 259405995a535119cf5d3704bb5c3fba696af808 Mon Sep 17 00:00:00 2001 From: Shabirmean Date: Wed, 4 May 2016 00:47:52 +0530 Subject: [PATCH] Modifications to the RPi Agent to publish to the DAS MQTT Receiver for temperature --- .../EventReceiver_mqtt_temperature.xml | 2 +- .../transport/RaspberryPiMQTTConnector.java | 56 +++++++++++-------- .../resources/agent/src/RaspberryAgent.py | 6 +- .../src/main/resources/agent/src/iotUtils.py | 17 ++++-- .../main/resources/agent/src/mqttConnector.py | 10 ++-- 5 files changed, 54 insertions(+), 37 deletions(-) diff --git a/components/iot-plugins/iot-analytics/org.wso2.carbon.device.mgt.iot.analytics/src/main/resources/carbonapps/Temperature/Eventreceiver_mqtt_temperature_1.0.0/EventReceiver_mqtt_temperature.xml b/components/iot-plugins/iot-analytics/org.wso2.carbon.device.mgt.iot.analytics/src/main/resources/carbonapps/Temperature/Eventreceiver_mqtt_temperature_1.0.0/EventReceiver_mqtt_temperature.xml index 1166733c4d..1de2653afb 100644 --- a/components/iot-plugins/iot-analytics/org.wso2.carbon.device.mgt.iot.analytics/src/main/resources/carbonapps/Temperature/Eventreceiver_mqtt_temperature_1.0.0/EventReceiver_mqtt_temperature.xml +++ b/components/iot-plugins/iot-analytics/org.wso2.carbon.device.mgt.iot.analytics/src/main/resources/carbonapps/Temperature/Eventreceiver_mqtt_temperature_1.0.0/EventReceiver_mqtt_temperature.xml @@ -19,7 +19,7 @@ - wso2/carbon.super/+/temperature + wso2/carbon.super/+/+/temperature admin device_id_json_path:event.metaData.deviceId,device_id_topic_hierarchy_index:3 default diff --git a/components/iot-plugins/raspberrypi-plugin/org.wso2.carbon.device.mgt.iot.raspberrypi.api/src/main/java/org/wso2/carbon/device/mgt/iot/raspberrypi/service/impl/transport/RaspberryPiMQTTConnector.java b/components/iot-plugins/raspberrypi-plugin/org.wso2.carbon.device.mgt.iot.raspberrypi.api/src/main/java/org/wso2/carbon/device/mgt/iot/raspberrypi/service/impl/transport/RaspberryPiMQTTConnector.java index 83bdd863c1..48c874fc44 100644 --- a/components/iot-plugins/raspberrypi-plugin/org.wso2.carbon.device.mgt.iot.raspberrypi.api/src/main/java/org/wso2/carbon/device/mgt/iot/raspberrypi/service/impl/transport/RaspberryPiMQTTConnector.java +++ b/components/iot-plugins/raspberrypi-plugin/org.wso2.carbon.device.mgt.iot.raspberrypi.api/src/main/java/org/wso2/carbon/device/mgt/iot/raspberrypi/service/impl/transport/RaspberryPiMQTTConnector.java @@ -22,11 +22,17 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.json.JSONObject; import org.wso2.carbon.apimgt.application.extension.APIManagementProviderService; import org.wso2.carbon.apimgt.application.extension.dto.ApiApplicationKey; import org.wso2.carbon.apimgt.application.extension.exception.APIManagerException; import org.wso2.carbon.context.PrivilegedCarbonContext; +import org.wso2.carbon.device.mgt.common.Device; +import org.wso2.carbon.device.mgt.common.DeviceIdentifier; +import org.wso2.carbon.device.mgt.common.DeviceManagementException; +import org.wso2.carbon.device.mgt.core.service.DeviceManagementProviderService; import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttConfig; +import org.wso2.carbon.device.mgt.iot.raspberrypi.service.impl.exception.RaspberrypiException; import org.wso2.carbon.device.mgt.iot.raspberrypi.service.impl.util.APIUtil; import org.wso2.carbon.device.mgt.iot.raspberrypi.plugin.constants.RaspberrypiConstants; import org.wso2.carbon.device.mgt.iot.transport.TransportHandlerException; @@ -37,10 +43,12 @@ import org.wso2.carbon.identity.jwt.client.extension.exception.JWTClientExceptio import org.wso2.carbon.user.api.UserStoreException; import java.nio.charset.StandardCharsets; +import java.security.PublicKey; import java.util.UUID; public class RaspberryPiMQTTConnector extends MQTTTransportHandler { private static Log log = LogFactory.getLog(RaspberryPiMQTTConnector.class); +// subscribeTopic is not used for the RaspberryPi sample since the DAS device directly publishes to DAS MQTT receiver private static final String subscribeTopic = "wso2/+/"+ RaspberrypiConstants.DEVICE_TYPE + "/+/publisher"; private static final String KEY_TYPE = "PRODUCTION"; private static final String EMPTY_STRING = ""; @@ -104,7 +112,26 @@ public class RaspberryPiMQTTConnector extends MQTTTransportHandler { } @Override - public void processIncomingMessage(MqttMessage message, String... messageParams) throws TransportHandlerException { + public void publishDeviceData(String... publishData) throws TransportHandlerException { + if (publishData.length != 3) { + String errorMsg = "Incorrect number of arguments received to SEND-MQTT Message. " + + "Need to be [owner, deviceId, resource{BULB/TEMP}, state{ON/OFF or null}]"; + log.error(errorMsg); + throw new TransportHandlerException(errorMsg); + } + + String deviceId = publishData[0]; + String resource = publishData[1]; + String state = publishData[2]; + + MqttMessage pushMessage = new MqttMessage(); + String publishTopic = "wso2/" + APIUtil.getTenantDomainOftheUser() + "/" + + RaspberrypiConstants.DEVICE_TYPE + "/" + deviceId; + String actualMessage = resource + ":" + state; + pushMessage.setPayload(actualMessage.getBytes(StandardCharsets.UTF_8)); + pushMessage.setQos(DEFAULT_MQTT_QUALITY_OF_SERVICE); + pushMessage.setRetained(false); + publishToQueue(publishTopic, pushMessage); } @Override @@ -135,6 +162,10 @@ public class RaspberryPiMQTTConnector extends MQTTTransportHandler { terminatorThread.start(); } + @Override + public void processIncomingMessage(MqttMessage mqttMessage, String... messageParams) throws TransportHandlerException { + } + @Override public void processIncomingMessage() throws TransportHandlerException { @@ -154,28 +185,5 @@ public class RaspberryPiMQTTConnector extends MQTTTransportHandler { public void publishDeviceData(MqttMessage publishData) throws TransportHandlerException { } - - @Override - public void publishDeviceData(String... publishData) throws TransportHandlerException { - if (publishData.length != 3) { - String errorMsg = "Incorrect number of arguments received to SEND-MQTT Message. " + - "Need to be [owner, deviceId, resource{BULB/TEMP}, state{ON/OFF or null}]"; - log.error(errorMsg); - throw new TransportHandlerException(errorMsg); - } - - String deviceId = publishData[0]; - String resource = publishData[1]; - String state = publishData[2]; - - MqttMessage pushMessage = new MqttMessage(); - String publishTopic = "wso2/" + APIUtil.getTenantDomainOftheUser() + "/" - + RaspberrypiConstants.DEVICE_TYPE + "/" + deviceId; - String actualMessage = resource + ":" + state; - pushMessage.setPayload(actualMessage.getBytes(StandardCharsets.UTF_8)); - pushMessage.setQos(DEFAULT_MQTT_QUALITY_OF_SERVICE); - pushMessage.setRetained(false); - publishToQueue(publishTopic, pushMessage); - } } diff --git a/features/iot-plugins-feature/raspberrypi-plugin-feature/org.wso2.carbon.device.mgt.iot.raspberrypi.feature/src/main/resources/agent/src/RaspberryAgent.py b/features/iot-plugins-feature/raspberrypi-plugin-feature/org.wso2.carbon.device.mgt.iot.raspberrypi.feature/src/main/resources/agent/src/RaspberryAgent.py index 006134c61a..7b4afed5b9 100644 --- a/features/iot-plugins-feature/raspberrypi-plugin-feature/org.wso2.carbon.device.mgt.iot.raspberrypi.feature/src/main/resources/agent/src/RaspberryAgent.py +++ b/features/iot-plugins-feature/raspberrypi-plugin-feature/org.wso2.carbon.device.mgt.iot.raspberrypi.feature/src/main/resources/agent/src/RaspberryAgent.py @@ -22,7 +22,7 @@ import logging, logging.handlers import sys, os, signal, argparse import running_mode -import time, threading, datetime +import time, threading, datetime, calendar # import httplib, ssl # from functools import wraps @@ -176,9 +176,9 @@ def configureLogger(loggerName): # This method connects to the Device-Cloud and pushes data # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def connectAndPushData(): + currentTime = calendar.timegm(time.gmtime()) rPiTemperature = iotUtils.LAST_TEMP # Push the last read temperature value - PUSH_DATA = iotUtils.DEVICE_INFO + iotUtils.DEVICE_DATA.format(temperature=rPiTemperature) - PUSH_DATA += '}' + PUSH_DATA = iotUtils.DEVICE_INFO.format(currentTime, rPiTemperature) print '~~~~~~~~~~~~~~~~~~~~~~~~ Publishing Device-Data ~~~~~~~~~~~~~~~~~~~~~~~~~' print ('PUBLISHED DATA: ' + PUSH_DATA) diff --git a/features/iot-plugins-feature/raspberrypi-plugin-feature/org.wso2.carbon.device.mgt.iot.raspberrypi.feature/src/main/resources/agent/src/iotUtils.py b/features/iot-plugins-feature/raspberrypi-plugin-feature/org.wso2.carbon.device.mgt.iot.raspberrypi.feature/src/main/resources/agent/src/iotUtils.py index 0f17985ec5..323cf5ea52 100644 --- a/features/iot-plugins-feature/raspberrypi-plugin-feature/org.wso2.carbon.device.mgt.iot.raspberrypi.feature/src/main/resources/agent/src/iotUtils.py +++ b/features/iot-plugins-feature/raspberrypi-plugin-feature/org.wso2.carbon.device.mgt.iot.raspberrypi.feature/src/main/resources/agent/src/iotUtils.py @@ -26,7 +26,7 @@ import random import running_mode # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -# HOST_NAME(IP) of the Device +# HOST_NAME(IP) of the Device # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ global HOST_NAME HOST_NAME = "0.0.0.0" @@ -52,6 +52,7 @@ configParser = ConfigParser.RawConfigParser() configFilePath = os.path.join(os.path.dirname(__file__), './deviceConfig.properties') configParser.read(configFilePath) +SERVER_NAME = configParser.get('Device-Configurations', 'server-name') DEVICE_OWNER = configParser.get('Device-Configurations', 'owner') DEVICE_ID = configParser.get('Device-Configurations', 'deviceId') MQTT_EP = configParser.get('Device-Configurations', 'mqtt-ep') @@ -60,12 +61,18 @@ AUTH_TOKEN = configParser.get('Device-Configurations', 'auth-token') CONTROLLER_CONTEXT = configParser.get('Device-Configurations', 'controller-context') MQTT_SUB_TOPIC = configParser.get('Device-Configurations', 'mqtt-sub-topic').format(owner = DEVICE_OWNER, deviceId = DEVICE_ID) MQTT_PUB_TOPIC = configParser.get('Device-Configurations', 'mqtt-pub-topic').format(owner = DEVICE_OWNER, deviceId = DEVICE_ID) -DEVICE_INFO = '{"owner":"' + DEVICE_OWNER + '","deviceId":"' + DEVICE_ID + '","temperature":' +DEVICE_INFO = '{{"event":{{"metaData":{{"owner":"' + DEVICE_OWNER + '","type":"raspberrypi","deviceId":"' + DEVICE_ID + '","time":{:.2f}}},"payloadData":{{"temperature":{:.2f}}}}}}}' + +# '{"owner":"' + DEVICE_OWNER + '","deviceId":"' + DEVICE_ID + '","temperature":' HTTPS_EP = configParser.get('Device-Configurations', 'https-ep') HTTP_EP = configParser.get('Device-Configurations', 'http-ep') APIM_EP = configParser.get('Device-Configurations', 'apim-ep') -DEVICE_IP = '"{ip}","value":' -DEVICE_DATA = '"{temperature}"' # '"{temperature}:{load}:OFF"' +# DEVICE_IP = '"{ip}","value":' +# DEVICE_DATA = '"{temperature}"' # '"{temperature}:{load}:OFF"' + + +# {"event": {"metaData": {"owner": "admin", "type": "arduino","deviceId": "s15kdwf34vue","time": 0},"payloadData": { "temperature": 22} }} + ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -184,4 +191,4 @@ def main(): # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if __name__ == '__main__': - main() \ No newline at end of file + main() diff --git a/features/iot-plugins-feature/raspberrypi-plugin-feature/org.wso2.carbon.device.mgt.iot.raspberrypi.feature/src/main/resources/agent/src/mqttConnector.py b/features/iot-plugins-feature/raspberrypi-plugin-feature/org.wso2.carbon.device.mgt.iot.raspberrypi.feature/src/main/resources/agent/src/mqttConnector.py index 066a3c6480..f0da9154ce 100644 --- a/features/iot-plugins-feature/raspberrypi-plugin-feature/org.wso2.carbon.device.mgt.iot.raspberrypi.feature/src/main/resources/agent/src/mqttConnector.py +++ b/features/iot-plugins-feature/raspberrypi-plugin-feature/org.wso2.carbon.device.mgt.iot.raspberrypi.feature/src/main/resources/agent/src/mqttConnector.py @@ -78,20 +78,22 @@ def publish(msg): # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # The Main method of the server script -# This method is invoked from RaspberryStats.py on a new thread +# This method is invoked from RaspberryStats.py on a new thread # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def main(): MQTT_ENDPOINT = iotUtils.MQTT_EP.split(":") MQTT_IP = MQTT_ENDPOINT[1].replace('//','') MQTT_PORT = int(MQTT_ENDPOINT[2]) - DEV_OWNER = iotUtils.DEVICE_OWNER + SERVER_NAME = iotUtils.SERVER_NAME DEV_ID = iotUtils.DEVICE_ID global TOPIC_TO_SUBSCRIBE - TOPIC_TO_SUBSCRIBE = "wso2/" + DEV_OWNER + "/raspberrypi/" + DEV_ID + # TOPIC_TO_SUBSCRIBE = SERVER_NAME + "/raspberrypi/" + DEV_ID + TOPIC_TO_SUBSCRIBE = SERVER_NAME + "/raspberrypi/" + DEV_ID global TOPIC_TO_PUBLISH - TOPIC_TO_PUBLISH = "wso2/" + DEV_OWNER + "/raspberrypi/" + DEV_ID + "/publisher" + # TOPIC_TO_PUBLISH = SERVER_NAME + "/raspberrypi/" + DEV_ID + "/publisher" + TOPIC_TO_PUBLISH = SERVER_NAME + "/raspberrypi/" + DEV_ID + "/temperature" print ("MQTT_LISTENER: MQTT_ENDPOINT is " + str(MQTT_ENDPOINT)) print ("MQTT_LISTENER: MQTT_TOPIC is " + TOPIC_TO_SUBSCRIBE)