Modifications to the RPi Agent to publish to the DAS MQTT Receiver for temperature

revert-dabc3590
Shabirmean 9 years ago
parent 07bc52e86c
commit 259405995a

@ -19,7 +19,7 @@
<eventReceiver name="temperature-mqtt" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver"> <eventReceiver name="temperature-mqtt" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver">
<from eventAdapterType="oauth-mqtt"> <from eventAdapterType="oauth-mqtt">
<property name="topic">wso2/carbon.super/+/temperature</property> <property name="topic">wso2/carbon.super/+/+/temperature</property>
<property name="username">admin</property> <property name="username">admin</property>
<property name="contentValidationParams">device_id_json_path:event.metaData.deviceId,device_id_topic_hierarchy_index:3</property> <property name="contentValidationParams">device_id_json_path:event.metaData.deviceId,device_id_topic_hierarchy_index:3</property>
<property name="contentValidation">default</property> <property name="contentValidation">default</property>

@ -22,11 +22,17 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.json.JSONObject;
import org.wso2.carbon.apimgt.application.extension.APIManagementProviderService; import org.wso2.carbon.apimgt.application.extension.APIManagementProviderService;
import org.wso2.carbon.apimgt.application.extension.dto.ApiApplicationKey; import org.wso2.carbon.apimgt.application.extension.dto.ApiApplicationKey;
import org.wso2.carbon.apimgt.application.extension.exception.APIManagerException; import org.wso2.carbon.apimgt.application.extension.exception.APIManagerException;
import org.wso2.carbon.context.PrivilegedCarbonContext; import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.device.mgt.common.Device;
import org.wso2.carbon.device.mgt.common.DeviceIdentifier;
import org.wso2.carbon.device.mgt.common.DeviceManagementException;
import org.wso2.carbon.device.mgt.core.service.DeviceManagementProviderService;
import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttConfig; import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttConfig;
import org.wso2.carbon.device.mgt.iot.raspberrypi.service.impl.exception.RaspberrypiException;
import org.wso2.carbon.device.mgt.iot.raspberrypi.service.impl.util.APIUtil; import org.wso2.carbon.device.mgt.iot.raspberrypi.service.impl.util.APIUtil;
import org.wso2.carbon.device.mgt.iot.raspberrypi.plugin.constants.RaspberrypiConstants; import org.wso2.carbon.device.mgt.iot.raspberrypi.plugin.constants.RaspberrypiConstants;
import org.wso2.carbon.device.mgt.iot.transport.TransportHandlerException; import org.wso2.carbon.device.mgt.iot.transport.TransportHandlerException;
@ -37,10 +43,12 @@ import org.wso2.carbon.identity.jwt.client.extension.exception.JWTClientExceptio
import org.wso2.carbon.user.api.UserStoreException; import org.wso2.carbon.user.api.UserStoreException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.security.PublicKey;
import java.util.UUID; import java.util.UUID;
public class RaspberryPiMQTTConnector extends MQTTTransportHandler { public class RaspberryPiMQTTConnector extends MQTTTransportHandler {
private static Log log = LogFactory.getLog(RaspberryPiMQTTConnector.class); private static Log log = LogFactory.getLog(RaspberryPiMQTTConnector.class);
// subscribeTopic is not used for the RaspberryPi sample since the DAS device directly publishes to DAS MQTT receiver
private static final String subscribeTopic = "wso2/+/"+ RaspberrypiConstants.DEVICE_TYPE + "/+/publisher"; private static final String subscribeTopic = "wso2/+/"+ RaspberrypiConstants.DEVICE_TYPE + "/+/publisher";
private static final String KEY_TYPE = "PRODUCTION"; private static final String KEY_TYPE = "PRODUCTION";
private static final String EMPTY_STRING = ""; private static final String EMPTY_STRING = "";
@ -104,7 +112,26 @@ public class RaspberryPiMQTTConnector extends MQTTTransportHandler {
} }
@Override @Override
public void processIncomingMessage(MqttMessage message, String... messageParams) throws TransportHandlerException { public void publishDeviceData(String... publishData) throws TransportHandlerException {
if (publishData.length != 3) {
String errorMsg = "Incorrect number of arguments received to SEND-MQTT Message. " +
"Need to be [owner, deviceId, resource{BULB/TEMP}, state{ON/OFF or null}]";
log.error(errorMsg);
throw new TransportHandlerException(errorMsg);
}
String deviceId = publishData[0];
String resource = publishData[1];
String state = publishData[2];
MqttMessage pushMessage = new MqttMessage();
String publishTopic = "wso2/" + APIUtil.getTenantDomainOftheUser() + "/"
+ RaspberrypiConstants.DEVICE_TYPE + "/" + deviceId;
String actualMessage = resource + ":" + state;
pushMessage.setPayload(actualMessage.getBytes(StandardCharsets.UTF_8));
pushMessage.setQos(DEFAULT_MQTT_QUALITY_OF_SERVICE);
pushMessage.setRetained(false);
publishToQueue(publishTopic, pushMessage);
} }
@Override @Override
@ -135,6 +162,10 @@ public class RaspberryPiMQTTConnector extends MQTTTransportHandler {
terminatorThread.start(); terminatorThread.start();
} }
@Override
public void processIncomingMessage(MqttMessage mqttMessage, String... messageParams) throws TransportHandlerException {
}
@Override @Override
public void processIncomingMessage() throws TransportHandlerException { public void processIncomingMessage() throws TransportHandlerException {
@ -154,28 +185,5 @@ public class RaspberryPiMQTTConnector extends MQTTTransportHandler {
public void publishDeviceData(MqttMessage publishData) throws TransportHandlerException { public void publishDeviceData(MqttMessage publishData) throws TransportHandlerException {
} }
@Override
public void publishDeviceData(String... publishData) throws TransportHandlerException {
if (publishData.length != 3) {
String errorMsg = "Incorrect number of arguments received to SEND-MQTT Message. " +
"Need to be [owner, deviceId, resource{BULB/TEMP}, state{ON/OFF or null}]";
log.error(errorMsg);
throw new TransportHandlerException(errorMsg);
}
String deviceId = publishData[0];
String resource = publishData[1];
String state = publishData[2];
MqttMessage pushMessage = new MqttMessage();
String publishTopic = "wso2/" + APIUtil.getTenantDomainOftheUser() + "/"
+ RaspberrypiConstants.DEVICE_TYPE + "/" + deviceId;
String actualMessage = resource + ":" + state;
pushMessage.setPayload(actualMessage.getBytes(StandardCharsets.UTF_8));
pushMessage.setQos(DEFAULT_MQTT_QUALITY_OF_SERVICE);
pushMessage.setRetained(false);
publishToQueue(publishTopic, pushMessage);
}
} }

@ -22,7 +22,7 @@
import logging, logging.handlers import logging, logging.handlers
import sys, os, signal, argparse import sys, os, signal, argparse
import running_mode import running_mode
import time, threading, datetime import time, threading, datetime, calendar
# import httplib, ssl # import httplib, ssl
# from functools import wraps # from functools import wraps
@ -176,9 +176,9 @@ def configureLogger(loggerName):
# This method connects to the Device-Cloud and pushes data # This method connects to the Device-Cloud and pushes data
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def connectAndPushData(): def connectAndPushData():
currentTime = calendar.timegm(time.gmtime())
rPiTemperature = iotUtils.LAST_TEMP # Push the last read temperature value rPiTemperature = iotUtils.LAST_TEMP # Push the last read temperature value
PUSH_DATA = iotUtils.DEVICE_INFO + iotUtils.DEVICE_DATA.format(temperature=rPiTemperature) PUSH_DATA = iotUtils.DEVICE_INFO.format(currentTime, rPiTemperature)
PUSH_DATA += '}'
print '~~~~~~~~~~~~~~~~~~~~~~~~ Publishing Device-Data ~~~~~~~~~~~~~~~~~~~~~~~~~' print '~~~~~~~~~~~~~~~~~~~~~~~~ Publishing Device-Data ~~~~~~~~~~~~~~~~~~~~~~~~~'
print ('PUBLISHED DATA: ' + PUSH_DATA) print ('PUBLISHED DATA: ' + PUSH_DATA)

@ -26,7 +26,7 @@ import random
import running_mode import running_mode
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# HOST_NAME(IP) of the Device # HOST_NAME(IP) of the Device
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
global HOST_NAME global HOST_NAME
HOST_NAME = "0.0.0.0" HOST_NAME = "0.0.0.0"
@ -52,6 +52,7 @@ configParser = ConfigParser.RawConfigParser()
configFilePath = os.path.join(os.path.dirname(__file__), './deviceConfig.properties') configFilePath = os.path.join(os.path.dirname(__file__), './deviceConfig.properties')
configParser.read(configFilePath) configParser.read(configFilePath)
SERVER_NAME = configParser.get('Device-Configurations', 'server-name')
DEVICE_OWNER = configParser.get('Device-Configurations', 'owner') DEVICE_OWNER = configParser.get('Device-Configurations', 'owner')
DEVICE_ID = configParser.get('Device-Configurations', 'deviceId') DEVICE_ID = configParser.get('Device-Configurations', 'deviceId')
MQTT_EP = configParser.get('Device-Configurations', 'mqtt-ep') MQTT_EP = configParser.get('Device-Configurations', 'mqtt-ep')
@ -60,12 +61,18 @@ AUTH_TOKEN = configParser.get('Device-Configurations', 'auth-token')
CONTROLLER_CONTEXT = configParser.get('Device-Configurations', 'controller-context') CONTROLLER_CONTEXT = configParser.get('Device-Configurations', 'controller-context')
MQTT_SUB_TOPIC = configParser.get('Device-Configurations', 'mqtt-sub-topic').format(owner = DEVICE_OWNER, deviceId = DEVICE_ID) MQTT_SUB_TOPIC = configParser.get('Device-Configurations', 'mqtt-sub-topic').format(owner = DEVICE_OWNER, deviceId = DEVICE_ID)
MQTT_PUB_TOPIC = configParser.get('Device-Configurations', 'mqtt-pub-topic').format(owner = DEVICE_OWNER, deviceId = DEVICE_ID) MQTT_PUB_TOPIC = configParser.get('Device-Configurations', 'mqtt-pub-topic').format(owner = DEVICE_OWNER, deviceId = DEVICE_ID)
DEVICE_INFO = '{"owner":"' + DEVICE_OWNER + '","deviceId":"' + DEVICE_ID + '","temperature":' DEVICE_INFO = '{{"event":{{"metaData":{{"owner":"' + DEVICE_OWNER + '","type":"raspberrypi","deviceId":"' + DEVICE_ID + '","time":{:.2f}}},"payloadData":{{"temperature":{:.2f}}}}}}}'
# '{"owner":"' + DEVICE_OWNER + '","deviceId":"' + DEVICE_ID + '","temperature":'
HTTPS_EP = configParser.get('Device-Configurations', 'https-ep') HTTPS_EP = configParser.get('Device-Configurations', 'https-ep')
HTTP_EP = configParser.get('Device-Configurations', 'http-ep') HTTP_EP = configParser.get('Device-Configurations', 'http-ep')
APIM_EP = configParser.get('Device-Configurations', 'apim-ep') APIM_EP = configParser.get('Device-Configurations', 'apim-ep')
DEVICE_IP = '"{ip}","value":' # DEVICE_IP = '"{ip}","value":'
DEVICE_DATA = '"{temperature}"' # '"{temperature}:{load}:OFF"' # DEVICE_DATA = '"{temperature}"' # '"{temperature}:{load}:OFF"'
# {"event": {"metaData": {"owner": "admin", "type": "arduino","deviceId": "s15kdwf34vue","time": 0},"payloadData": { "temperature": 22} }}
### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@ -184,4 +191,4 @@ def main():
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
if __name__ == '__main__': if __name__ == '__main__':
main() main()

@ -78,20 +78,22 @@ def publish(msg):
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# The Main method of the server script # The Main method of the server script
# This method is invoked from RaspberryStats.py on a new thread # This method is invoked from RaspberryStats.py on a new thread
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def main(): def main():
MQTT_ENDPOINT = iotUtils.MQTT_EP.split(":") MQTT_ENDPOINT = iotUtils.MQTT_EP.split(":")
MQTT_IP = MQTT_ENDPOINT[1].replace('//','') MQTT_IP = MQTT_ENDPOINT[1].replace('//','')
MQTT_PORT = int(MQTT_ENDPOINT[2]) MQTT_PORT = int(MQTT_ENDPOINT[2])
DEV_OWNER = iotUtils.DEVICE_OWNER SERVER_NAME = iotUtils.SERVER_NAME
DEV_ID = iotUtils.DEVICE_ID DEV_ID = iotUtils.DEVICE_ID
global TOPIC_TO_SUBSCRIBE global TOPIC_TO_SUBSCRIBE
TOPIC_TO_SUBSCRIBE = "wso2/" + DEV_OWNER + "/raspberrypi/" + DEV_ID # TOPIC_TO_SUBSCRIBE = SERVER_NAME + "/raspberrypi/" + DEV_ID
TOPIC_TO_SUBSCRIBE = SERVER_NAME + "/raspberrypi/" + DEV_ID
global TOPIC_TO_PUBLISH global TOPIC_TO_PUBLISH
TOPIC_TO_PUBLISH = "wso2/" + DEV_OWNER + "/raspberrypi/" + DEV_ID + "/publisher" # TOPIC_TO_PUBLISH = SERVER_NAME + "/raspberrypi/" + DEV_ID + "/publisher"
TOPIC_TO_PUBLISH = SERVER_NAME + "/raspberrypi/" + DEV_ID + "/temperature"
print ("MQTT_LISTENER: MQTT_ENDPOINT is " + str(MQTT_ENDPOINT)) print ("MQTT_LISTENER: MQTT_ENDPOINT is " + str(MQTT_ENDPOINT))
print ("MQTT_LISTENER: MQTT_TOPIC is " + TOPIC_TO_SUBSCRIBE) print ("MQTT_LISTENER: MQTT_TOPIC is " + TOPIC_TO_SUBSCRIBE)

Loading…
Cancel
Save