Changes to enable MQTT Support for RPi DeviceType

revert-dabc3590
Shabirmean 9 years ago
parent 4e51ca9ac4
commit 07bc52e86c

@ -41,7 +41,7 @@ import java.util.UUID;
public class RaspberryPiMQTTConnector extends MQTTTransportHandler {
private static Log log = LogFactory.getLog(RaspberryPiMQTTConnector.class);
private static final String subscribeTopic = "wso2/" + RaspberrypiConstants.DEVICE_TYPE + "/+/publisher";
private static final String subscribeTopic = "wso2/+/"+ RaspberrypiConstants.DEVICE_TYPE + "/+/publisher";
private static final String KEY_TYPE = "PRODUCTION";
private static final String EMPTY_STRING = "";

@ -69,7 +69,7 @@ public class VirtualFireAlarmMQTTConnector extends MQTTTransportHandler {
private static Log log = LogFactory.getLog(VirtualFireAlarmMQTTConnector.class);
// subscription topic: <SERVER_NAME>/+/virtual_firealarm/+/publisher
// wildcard (+) is in place for device_owner & device_id
private static String subscribeTopic = "wso2/+/"+ VirtualFireAlarmConstants.DEVICE_TYPE + "/+/publisher";
private static final String subscribeTopic = "wso2/+/"+ VirtualFireAlarmConstants.DEVICE_TYPE + "/+/publisher";
private static String iotServerSubscriber = UUID.randomUUID().toString().substring(0, 5);
private static final String KEY_TYPE = "PRODUCTION";
private static final String EMPTY_STRING = "";

@ -24,20 +24,20 @@ import sys, os, signal, argparse
import running_mode
import time, threading, datetime
import httplib, ssl
from functools import wraps
# import httplib, ssl
# from functools import wraps
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Overriding the default SSL version used in some of the Python (2.7.x) versions
# This is a known issue in earlier Python releases
# But was fixed in later versions. Ex-2.7.11
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def sslwrap(func):
@wraps(func)
def bar(*args, **kw):
kw['ssl_version'] = ssl.PROTOCOL_TLSv1
return func(*args, **kw)
return bar
# def sslwrap(func):
# @wraps(func)
# def bar(*args, **kw):
# kw['ssl_version'] = ssl.PROTOCOL_TLSv1
# return func(*args, **kw)
# return bar
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
PUSH_INTERVAL = 5000 # time interval between successive data pushes in seconds
@ -81,7 +81,7 @@ if args.interval:
if args.mode:
running_mode.RUNNING_MODE = args.mode
iotUtils = __import__('iotUtils')
mqttSubscriber = __import__('mqttConnector')
mqttConnector = __import__('mqttConnector')
# httpServer = __import__('httpServer') # python script used to start a http-server to listen for operations
# (includes the TEMPERATURE global variable)
@ -176,49 +176,54 @@ def configureLogger(loggerName):
# This method connects to the Device-Cloud and pushes data
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def connectAndPushData():
if sys.version_info<(2,7,9):
dcConnection = httplib.HTTPSConnection(host=DC_IP, port=DC_PORT)
else:
dcConnection = httplib.HTTPSConnection(host=DC_IP, port=DC_PORT, context=ssl._create_unverified_context())
dcConnection.set_debuglevel(1)
dcConnection.connect()
request = dcConnection.putrequest('POST', PUSH_ENDPOINT)
dcConnection.putheader('Authorization', 'Bearer ' + iotUtils.AUTH_TOKEN)
dcConnection.putheader('Content-Type', 'application/json')
### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
### Read the Temperature and Load info of RPi and construct payload
### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
rPiTemperature = iotUtils.LAST_TEMP # Push the last read temperature value
PUSH_DATA = iotUtils.DEVICE_INFO + iotUtils.DEVICE_IP.format(ip=HOST_AND_PORT) + iotUtils.DEVICE_DATA.format(
temperature=rPiTemperature)
PUSH_DATA = iotUtils.DEVICE_INFO + iotUtils.DEVICE_DATA.format(temperature=rPiTemperature)
PUSH_DATA += '}'
dcConnection.putheader('Content-Length', len(PUSH_DATA))
dcConnection.endheaders()
print PUSH_DATA
print '~~~~~~~~~~~~~~~~~~~~~~~~ Pushing Device-Data ~~~~~~~~~~~~~~~~~~~~~~~~~'
dcConnection.send(PUSH_DATA) # Push the data
dcResponse = dcConnection.getresponse()
print '~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~'
print ('RASPBERRY_STATS: ' + str(dcResponse.status))
print ('RASPBERRY_STATS: ' + str(dcResponse.reason))
print ('RASPBERRY_STATS: Response Message')
print str(dcResponse.msg)
print '~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~'
dcConnection.close()
if (dcResponse.status == 409 or dcResponse.status == 412):
print 'RASPBERRY_STATS: Re-registering Device IP'
registerDeviceIP()
print '~~~~~~~~~~~~~~~~~~~~~~~~ Publishing Device-Data ~~~~~~~~~~~~~~~~~~~~~~~~~'
print ('PUBLISHED DATA: ' + PUSH_DATA)
print ('PUBLISHED TOPIC: ' + mqttConnector.TOPIC_TO_PUBLISH)
mqttConnector.publish(PUSH_DATA)
# print '~~~~~~~~~~~~~~~~~~~~~~~~ End Of Publishing ~~~~~~~~~~~~~~~~~~~~~~~~~'
# if sys.version_info<(2,7,9):
# dcConnection = httplib.HTTPSConnection(host=DC_IP, port=DC_PORT)
# else:
# dcConnection = httplib.HTTPSConnection(host=DC_IP, port=DC_PORT, context=ssl._create_unverified_context())
# dcConnection.set_debuglevel(1)
# dcConnection.connect()
# request = dcConnection.putrequest('POST', PUSH_ENDPOINT)
# dcConnection.putheader('Authorization', 'Bearer ' + iotUtils.AUTH_TOKEN)
# dcConnection.putheader('Content-Type', 'application/json')
# ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ### Read the Temperature and Load info of RPi and construct payload
# ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# rPiTemperature = iotUtils.LAST_TEMP # Push the last read temperature value
# PUSH_DATA = iotUtils.DEVICE_INFO + iotUtils.DEVICE_IP.format(ip=HOST_AND_PORT) + iotUtils.DEVICE_DATA.format(
# temperature=rPiTemperature)
# PUSH_DATA += '}'
# dcConnection.putheader('Content-Length', len(PUSH_DATA))
# dcConnection.endheaders()
# print PUSH_DATA
# print '~~~~~~~~~~~~~~~~~~~~~~~~ Pushing Device-Data ~~~~~~~~~~~~~~~~~~~~~~~~~'
# dcConnection.send(PUSH_DATA) # Push the data
# dcResponse = dcConnection.getresponse()
# print '~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~'
# print ('RASPBERRY_STATS: ' + str(dcResponse.status))
# print ('RASPBERRY_STATS: ' + str(dcResponse.reason))
# print ('RASPBERRY_STATS: Response Message')
# print str(dcResponse.msg)
# print '~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~'
# dcConnection.close()
# if (dcResponse.status == 409 or dcResponse.status == 412):
# print 'RASPBERRY_STATS: Re-registering Device IP'
# registerDeviceIP()
### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@ -255,9 +260,9 @@ class TemperatureReaderThread(object):
print 'RASPBERRY_STATS: Temp={0:0.1f}*C Humidity={1:0.1f}%'.format(temperature, humidity)
except Exception, e:
print '~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~'
print "RASPBERRY_STATS: Exception in TempReaderThread: Could not successfully read Temperature"
print ("RASPBERRY_STATS: " + str(e))
print '~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~'
pass
time.sleep(self.interval)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@ -298,7 +303,7 @@ class SubscribeToMQTTQueue(object):
thread.start() # Start the execution
def run(self):
mqttSubscriber.main()
mqttConnector.main()
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@ -319,13 +324,11 @@ signal.signal(signal.SIGTERM, sigterm_handler)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def main():
configureLogger("WSO2IOT_RPiStats")
if running_mode.RUNNING_MODE == 'N':
iotUtils.setUpGPIOPins()
UtilsThread()
# registerDeviceIP() # Call the register endpoint and register Device IP
TemperatureReaderThread() # initiates and runs the thread to continuously read temperature from DHT Sensor
# ListenHTTPServerThread() # starts an HTTP Server that listens for operational commands to switch ON/OFF Led
SubscribeToMQTTQueue() # connects and subscribes to an MQTT Queue that receives MQTT commands from the server
TemperatureReaderThread() # initiates and runs the thread to continuously read temperature from DHT Sensor
while True:
try:
if iotUtils.LAST_TEMP > 0: # Push data only if there had been a successful temperature read

@ -60,7 +60,7 @@ AUTH_TOKEN = configParser.get('Device-Configurations', 'auth-token')
CONTROLLER_CONTEXT = configParser.get('Device-Configurations', 'controller-context')
MQTT_SUB_TOPIC = configParser.get('Device-Configurations', 'mqtt-sub-topic').format(owner = DEVICE_OWNER, deviceId = DEVICE_ID)
MQTT_PUB_TOPIC = configParser.get('Device-Configurations', 'mqtt-pub-topic').format(owner = DEVICE_OWNER, deviceId = DEVICE_ID)
DEVICE_INFO = '{"owner":"' + DEVICE_OWNER + '","deviceId":"' + DEVICE_ID + '","reply":'
DEVICE_INFO = '{"owner":"' + DEVICE_OWNER + '","deviceId":"' + DEVICE_ID + '","temperature":'
HTTPS_EP = configParser.get('Device-Configurations', 'https-ep')
HTTP_EP = configParser.get('Device-Configurations', 'http-ep')
APIM_EP = configParser.get('Device-Configurations', 'apim-ep')
@ -75,6 +75,7 @@ DEVICE_DATA = '"{temperature}"' # '"{temperature}:{load}:OFF"'
# Method used to switch ON/OFF the LED attached to RPi
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def switchBulb(state):
print '~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~'
print "Requested Switch State: " + state
if running_mode.RUNNING_MODE == "N":
@ -90,7 +91,6 @@ def switchBulb(state):
print "BULB Switched ON"
elif state == "OFF":
print "BULB Switched OFF"
print '~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~'
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@ -176,7 +176,7 @@ def setUpGPIOPins():
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def main():
global HOST_NAME
HOST_NAME = getDeviceIP()
# HOST_NAME = getDeviceIP()
if running_mode.RUNNING_MODE == 'N':
setUpGPIOPins()

@ -31,14 +31,16 @@ def on_connect(client, userdata, flags, rc):
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
print ("MQTT_LISTENER: Subscribing with topic " + TOPIC)
client.subscribe(TOPIC)
print ("MQTT_LISTENER: Subscribing with topic " + TOPIC_TO_SUBSCRIBE)
client.subscribe(TOPIC_TO_SUBSCRIBE)
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
print( "MQTT_LISTENER: " + msg.topic + " " + str(msg.payload) )
print '~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~'
print 'MQTT_LISTENER: Message Received by Device'
print( "MQTT_LISTENER: " + msg.topic + " --> " + str(msg.payload) )
request = str(msg.payload)
@ -58,12 +60,27 @@ def on_message(client, userdata, msg):
elif resource == "BULB":
iotUtils.switchBulb(state)
def on_publish(client, userdata, mid):
print '~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~'
print 'Temperature Data Published Succesfully'
# print (client)
# print (userdata)
# print (mid)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# The callback for when a PUBLISH message to the server when door is open or close
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def publish(msg):
# global mqttClient
mqttClient.publish(TOPIC_TO_PUBLISH, msg)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# The Main method of the server script
# This method is invoked from RaspberryStats.py on a new thread
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def main():
MQTT_ENDPOINT = iotUtils.MQTT_EP.split(":")
MQTT_IP = MQTT_ENDPOINT[1].replace('//','')
MQTT_PORT = int(MQTT_ENDPOINT[2])
@ -71,18 +88,24 @@ def main():
DEV_OWNER = iotUtils.DEVICE_OWNER
DEV_ID = iotUtils.DEVICE_ID
global TOPIC
TOPIC = "wso2/" + DEV_OWNER + "/raspberrypi/" + DEV_ID
global TOPIC_TO_SUBSCRIBE
TOPIC_TO_SUBSCRIBE = "wso2/" + DEV_OWNER + "/raspberrypi/" + DEV_ID
global TOPIC_TO_PUBLISH
TOPIC_TO_PUBLISH = "wso2/" + DEV_OWNER + "/raspberrypi/" + DEV_ID + "/publisher"
print ("MQTT_LISTENER: MQTT_ENDPOINT is " + str(MQTT_ENDPOINT))
print ("MQTT_LISTENER: MQTT_TOPIC is " + TOPIC)
print ("MQTT_LISTENER: MQTT_TOPIC is " + TOPIC_TO_SUBSCRIBE)
global mqttClient
mqttClient = mqtt.Client()
mqttClient.on_connect = on_connect
mqttClient.on_message = on_message
mqttClient.on_publish = on_publish
mqttClient.username_pw_set(iotUtils.AUTH_TOKEN, password = "")
mqttClient.connect(MQTT_IP, MQTT_PORT, 60)
while True:
try:
mqttClient.connect(MQTT_IP, MQTT_PORT, 180)
print "MQTT_LISTENER: " + time.asctime(), "Connected to MQTT Broker - %s:%s" % (MQTT_IP, MQTT_PORT)
# Blocking call that processes network traffic, dispatches callbacks and
@ -91,25 +114,14 @@ def main():
# manual interface.
mqttClient.loop_forever()
# while True:
# try:
# mqttClient.connect(MQTT_IP, MQTT_PORT, 60)
# print "MQTT_LISTENER: " + time.asctime(), "Connected to MQTT Broker - %s:%s" % (MQTT_IP, MQTT_PORT)
#
# # Blocking call that processes network traffic, dispatches callbacks and
# # handles reconnecting.
# # Other loop*() functions are available that give a threaded interface and a
# # manual interface.
# mqttClient.loop_forever()
#
# except (KeyboardInterrupt, Exception) as e:
# print "MQTT_LISTENER: Exception in MQTTServerThread (either KeyboardInterrupt or Other)"
# print ("MQTT_LISTENER: " + str(e))
#
# mqttClient.disconnect()
# print "MQTT_LISTENER: " + time.asctime(), "Connection to Broker closed - %s:%s" % (MQTT_IP, MQTT_PORT)
# print '~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~'
# pass
except (KeyboardInterrupt, Exception) as e:
print "MQTT_LISTENER: Exception in MQTTServerThread (either KeyboardInterrupt or Other)"
print ("MQTT_LISTENER: " + str(e))
mqttClient.disconnect()
print "MQTT_LISTENER: " + time.asctime(), "Connection to Broker closed - %s:%s" % (MQTT_IP, MQTT_PORT)
print '~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~'
pass
if __name__ == '__main__':

@ -97,14 +97,26 @@ done
cp deviceConfig.properties ./src
if [ "$mode" = "N" ]; then
# Install RPi.GPIO Library for Accessing RPi GPIO Pins
sudo apt-get install rpi.gpio
# -----------------------------------------------------
# Install Adafruit_Python_DHT Library for reading DHT Sensor
git clone https://github.com/adafruit/Adafruit_Python_DHT.git
sudo apt-get install build-essential python-dev python-openssl
sudo python ./Adafruit_Python_DHT/setup.py install
cd ./Adafruit_Python_DHT
sudo python setup.py install
cd ..
# -----------------------------------------------------
# Install Paho-MQTT-Library for MQTT Communication
git clone https://github.com/eclipse/paho.mqtt.python.git
cd ./paho.mqtt.python
sudo python setup.py install
cd ..
# -----------------------------------------------------
fi
chmod +x ./src/RaspberryAgent.py
./src/RaspberryAgent.py -i $input -m $mode
sudo python ./src/RaspberryAgent.py -i $input -m $mode
if [ $? -ne 0 ]; then
echo "Could not start the service..."

Loading…
Cancel
Save