forked from community/device-mgt-plugins
parent
fda220b3f6
commit
0e47fbc48a
62
components/device-mgt-iot-raspberrypi/org.wso2.carbon.device.mgt.iot.raspberrypi.controller.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/raspberrypi/service/RaspberryPiService.java → components/device-mgt-iot-raspberrypi/org.wso2.carbon.device.mgt.iot.raspberrypi.controller.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/raspberrypi/controller/service/impl/RaspberryPiControllerService.java
62
components/device-mgt-iot-raspberrypi/org.wso2.carbon.device.mgt.iot.raspberrypi.controller.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/raspberrypi/service/RaspberryPiService.java → components/device-mgt-iot-raspberrypi/org.wso2.carbon.device.mgt.iot.raspberrypi.controller.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/raspberrypi/controller/service/impl/RaspberryPiControllerService.java
2
components/device-mgt-iot-raspberrypi/org.wso2.carbon.device.mgt.iot.raspberrypi.mgt.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/raspberrypi/service/dto/DeviceJSON.java → components/device-mgt-iot-raspberrypi/org.wso2.carbon.device.mgt.iot.raspberrypi.controller.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/raspberrypi/controller/service/impl/dto/DeviceJSON.java
2
components/device-mgt-iot-raspberrypi/org.wso2.carbon.device.mgt.iot.raspberrypi.mgt.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/raspberrypi/service/dto/DeviceJSON.java → components/device-mgt-iot-raspberrypi/org.wso2.carbon.device.mgt.iot.raspberrypi.controller.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/raspberrypi/controller/service/impl/dto/DeviceJSON.java
2
components/device-mgt-iot-raspberrypi/org.wso2.carbon.device.mgt.iot.raspberrypi.controller.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/raspberrypi/service/exception/RaspberrypiException.java → components/device-mgt-iot-raspberrypi/org.wso2.carbon.device.mgt.iot.raspberrypi.controller.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/raspberrypi/controller/service/impl/exception/RaspberrypiException.java
2
components/device-mgt-iot-raspberrypi/org.wso2.carbon.device.mgt.iot.raspberrypi.controller.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/raspberrypi/service/exception/RaspberrypiException.java → components/device-mgt-iot-raspberrypi/org.wso2.carbon.device.mgt.iot.raspberrypi.controller.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/raspberrypi/controller/service/impl/exception/RaspberrypiException.java
@ -0,0 +1,179 @@
|
||||
/*
|
||||
* Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.wso2.carbon.device.mgt.iot.raspberrypi.controller.service.impl.transport;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.eclipse.paho.client.mqttv3.MqttException;
|
||||
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||
import org.wso2.carbon.device.mgt.common.DeviceManagementException;
|
||||
import org.wso2.carbon.device.mgt.iot.config.server.DeviceManagementConfigurationManager;
|
||||
import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttConfig;
|
||||
import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttSubscriber;
|
||||
import org.wso2.carbon.device.mgt.iot.raspberrypi.plugin.constants.RaspberrypiConstants;
|
||||
import org.wso2.carbon.device.mgt.iot.raspberrypi.controller.service.impl.util.RaspberrypiServiceUtils;
|
||||
import org.wso2.carbon.device.mgt.iot.sensormgt.SensorDataManager;
|
||||
import org.wso2.carbon.device.mgt.iot.transport.TransportHandlerException;
|
||||
import org.wso2.carbon.device.mgt.iot.transport.mqtt.MQTTTransportHandler;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Calendar;
|
||||
import java.util.UUID;
|
||||
|
||||
public class RaspberryPiMQTTConnector extends MQTTTransportHandler {
|
||||
private static Log log = LogFactory.getLog(RaspberryPiMQTTConnector.class);
|
||||
|
||||
private static final String serverName =
|
||||
DeviceManagementConfigurationManager.getInstance().getDeviceManagementServerInfo().getName();
|
||||
|
||||
private static final String subscribeTopic =
|
||||
serverName + File.separator + "+" + File.separator + RaspberrypiConstants.DEVICE_TYPE +
|
||||
File.separator + "+" + File.separator + "publisher";
|
||||
|
||||
private static final String iotServerSubscriber = UUID.randomUUID().toString().substring(0, 5);
|
||||
|
||||
private RaspberryPiMQTTConnector() {
|
||||
super(iotServerSubscriber, RaspberrypiConstants.DEVICE_TYPE,
|
||||
MqttConfig.getInstance().getMqttQueueEndpoint(), subscribeTopic);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connect() {
|
||||
Runnable connector = new Runnable() {
|
||||
public void run() {
|
||||
while (!isConnected()) {
|
||||
try {
|
||||
String brokerUsername = MqttConfig.getInstance().getMqttQueueUsername();
|
||||
String brokerPassword = MqttConfig.getInstance().getMqttQueuePassword();
|
||||
setUsernameAndPassword(brokerUsername, brokerPassword);
|
||||
connectToQueue();
|
||||
} catch (TransportHandlerException e) {
|
||||
log.error("Connection to MQTT Broker at: " + mqttBrokerEndPoint + " failed", e);
|
||||
try {
|
||||
Thread.sleep(timeoutInterval);
|
||||
} catch (InterruptedException ex) {
|
||||
log.error("MQTT-Connector: Thread Sleep Interrupt Exception.", ex);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
subscribeToQueue();
|
||||
} catch (TransportHandlerException e) {
|
||||
log.warn("Subscription to MQTT Broker at: " + mqttBrokerEndPoint + " failed", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Thread connectorThread = new Thread(connector);
|
||||
connectorThread.setDaemon(true);
|
||||
connectorThread.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processIncomingMessage(MqttMessage message, String... messageParams) throws TransportHandlerException {
|
||||
String topic = messageParams[0];
|
||||
// owner and the deviceId are extracted from the MQTT topic to which the messgae was received.
|
||||
String ownerAndId = topic.replace(serverName + File.separator, "");
|
||||
ownerAndId = ownerAndId.replace(File.separator + RaspberrypiConstants.DEVICE_TYPE + File.separator, ":");
|
||||
ownerAndId = ownerAndId.replace(File.separator + "publisher", "");
|
||||
|
||||
String owner = ownerAndId.split(":")[0];
|
||||
String deviceId = ownerAndId.split(":")[1];
|
||||
String receivedMessage = message.toString();
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Received MQTT message for: [OWNER-" + owner + "] & [DEVICE.ID-" + deviceId + "]");
|
||||
log.debug("Message [" + receivedMessage + "] topic: [" + topic + "]");
|
||||
}
|
||||
|
||||
if (receivedMessage.contains("PUBLISHER")) {
|
||||
float temperature = Float.parseFloat(receivedMessage.split(":")[2]);
|
||||
|
||||
if (!RaspberrypiServiceUtils.publishToDAS(owner, deviceId, temperature)) {
|
||||
log.error("MQTT Subscriber: Publishing data to DAS failed.");
|
||||
}
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("MQTT Subscriber: Published data to DAS successfully.");
|
||||
}
|
||||
|
||||
} else if (receivedMessage.contains("TEMPERATURE")) {
|
||||
String temperatureValue = receivedMessage.split(":")[1];
|
||||
SensorDataManager.getInstance().setSensorRecord(deviceId, RaspberrypiConstants.SENSOR_TEMPERATURE,
|
||||
temperatureValue,
|
||||
Calendar.getInstance().getTimeInMillis());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void disconnect() {
|
||||
Runnable stopConnection = new Runnable() {
|
||||
public void run() {
|
||||
while (isConnected()) {
|
||||
try {
|
||||
closeConnection();
|
||||
} catch (MqttException e) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.warn("Unable to 'STOP' MQTT connection at broker at: " + mqttBrokerEndPoint
|
||||
+ " for device-type - " + RaspberrypiConstants.DEVICE_TYPE, e);
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(timeoutInterval);
|
||||
} catch (InterruptedException e1) {
|
||||
log.error("MQTT-Terminator: Thread Sleep Interrupt Exception at device-type - " +
|
||||
RaspberrypiConstants.DEVICE_TYPE, e1);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Thread terminatorThread = new Thread(stopConnection);
|
||||
terminatorThread.setDaemon(true);
|
||||
terminatorThread.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processIncomingMessage() throws TransportHandlerException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processIncomingMessage(MqttMessage message) throws TransportHandlerException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publishDeviceData() throws TransportHandlerException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publishDeviceData(MqttMessage publishData) throws TransportHandlerException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publishDeviceData(String... publishData) throws TransportHandlerException {
|
||||
|
||||
}
|
||||
}
|
||||
|
4
components/device-mgt-iot-raspberrypi/org.wso2.carbon.device.mgt.iot.raspberrypi.controller.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/raspberrypi/service/util/RaspberrypiServiceUtils.java → components/device-mgt-iot-raspberrypi/org.wso2.carbon.device.mgt.iot.raspberrypi.controller.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/raspberrypi/controller/service/impl/util/RaspberrypiServiceUtils.java
4
components/device-mgt-iot-raspberrypi/org.wso2.carbon.device.mgt.iot.raspberrypi.controller.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/raspberrypi/service/util/RaspberrypiServiceUtils.java → components/device-mgt-iot-raspberrypi/org.wso2.carbon.device.mgt.iot.raspberrypi.controller.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/raspberrypi/controller/service/impl/util/RaspberrypiServiceUtils.java
@ -1,36 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.wso2.carbon.device.mgt.iot.raspberrypi.service.dto;
|
||||
|
||||
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
|
||||
|
||||
import javax.xml.bind.annotation.XmlElement;
|
||||
import javax.xml.bind.annotation.XmlRootElement;
|
||||
|
||||
@XmlRootElement
|
||||
|
||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||
public class DeviceJSON {
|
||||
@XmlElement(required = true) public String owner;
|
||||
@XmlElement(required = true) public String deviceId;
|
||||
@XmlElement(required = true) public String reply;
|
||||
@XmlElement public Long time;
|
||||
@XmlElement public String key;
|
||||
@XmlElement public float value;
|
||||
}
|
@ -1,134 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.wso2.carbon.device.mgt.iot.raspberrypi.service.transport;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||
import org.wso2.carbon.device.mgt.common.DeviceManagementException;
|
||||
import org.wso2.carbon.device.mgt.iot.config.server.DeviceManagementConfigurationManager;
|
||||
import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttConfig;
|
||||
import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttSubscriber;
|
||||
import org.wso2.carbon.device.mgt.iot.raspberrypi.plugin.constants.RaspberrypiConstants;
|
||||
import org.wso2.carbon.device.mgt.iot.raspberrypi.service.util.RaspberrypiServiceUtils;
|
||||
import org.wso2.carbon.device.mgt.iot.sensormgt.SensorDataManager;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Calendar;
|
||||
import java.util.UUID;
|
||||
|
||||
public class RaspberryPiMQTTSubscriber extends MqttSubscriber {
|
||||
private static Log log = LogFactory.getLog(RaspberryPiMQTTSubscriber.class);
|
||||
|
||||
private static final String serverName =
|
||||
DeviceManagementConfigurationManager.getInstance().getDeviceManagementServerInfo().getName();
|
||||
private static final String subscribeTopic =
|
||||
serverName + File.separator + "+" + File.separator + RaspberrypiConstants.DEVICE_TYPE +
|
||||
File.separator + "+" + File.separator + "publisher";
|
||||
|
||||
private static final String iotServerSubscriber = UUID.randomUUID().toString().substring(0, 5);
|
||||
private String mqttEndpoint;
|
||||
|
||||
private RaspberryPiMQTTSubscriber() {
|
||||
super(iotServerSubscriber, RaspberrypiConstants.DEVICE_TYPE,
|
||||
MqttConfig.getInstance().getMqttQueueEndpoint(), subscribeTopic);
|
||||
}
|
||||
|
||||
public void initConnector() {
|
||||
mqttEndpoint = MqttConfig.getInstance().getMqttQueueEndpoint();
|
||||
}
|
||||
|
||||
public void connectAndSubscribe() {
|
||||
try {
|
||||
super.connectAndSubscribe();
|
||||
} catch (DeviceManagementException e) {
|
||||
log.error("Subscription to MQTT Broker at: " + mqttEndpoint + " failed");
|
||||
retryMQTTSubscription();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void postMessageArrived(String topic, MqttMessage mqttMessage) {
|
||||
String ownerAndId = topic.replace("wso2" + File.separator + "iot" + File.separator, "");
|
||||
ownerAndId = ownerAndId.replace(File.separator + RaspberrypiConstants.DEVICE_TYPE + File.separator, ":");
|
||||
ownerAndId = ownerAndId.replace(File.separator + "publisher", "");
|
||||
|
||||
String owner = ownerAndId.split(":")[0];
|
||||
String deviceId = ownerAndId.split(":")[1];
|
||||
String receivedMessage = mqttMessage.toString();
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Received MQTT message for: {OWNER-" + owner + "} & {DEVICE.ID-" + deviceId + "}");
|
||||
log.debug("MQTT: Received Message [" + receivedMessage + "] topic: [" + topic + "]");
|
||||
}
|
||||
|
||||
if (receivedMessage.contains("PUBLISHER")) {
|
||||
float temperature = Float.parseFloat(receivedMessage.split(":")[2]);
|
||||
|
||||
if (!RaspberrypiServiceUtils.publishToDAS(owner, deviceId, temperature)) {
|
||||
log.error("MQTT Subscriber: Publishing data to DAS failed.");
|
||||
}
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("MQTT Subscriber: Published data to DAS successfully.");
|
||||
}
|
||||
|
||||
} else if (receivedMessage.contains("TEMPERATURE")) {
|
||||
String temperatureValue = receivedMessage.split(":")[1];
|
||||
SensorDataManager.getInstance().setSensorRecord(deviceId, RaspberrypiConstants.SENSOR_TEMPERATURE,
|
||||
temperatureValue,
|
||||
Calendar.getInstance().getTimeInMillis());
|
||||
}
|
||||
}
|
||||
|
||||
private void retryMQTTSubscription() {
|
||||
Thread retryToSubscribe = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
while (true) {
|
||||
if (!isConnected()) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Subscriber re-trying to reach MQTT queue....");
|
||||
}
|
||||
|
||||
try {
|
||||
RaspberryPiMQTTSubscriber.super.connectAndSubscribe();
|
||||
} catch (DeviceManagementException e1) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Attempt to re-connect to MQTT-Queue failed");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(5000);
|
||||
} catch (InterruptedException e1) {
|
||||
log.error("MQTT: Thread S;eep Interrupt Exception");
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
retryToSubscribe.setDaemon(true);
|
||||
retryToSubscribe.start();
|
||||
}
|
||||
}
|
||||
|
59
components/device-mgt-iot-raspberrypi/org.wso2.carbon.device.mgt.iot.raspberrypi.mgt.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/raspberrypi/service/RaspberryPiService.java → components/device-mgt-iot-raspberrypi/org.wso2.carbon.device.mgt.iot.raspberrypi.manager.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/raspberrypi/manager/service/impl/RaspberryPiManagerService.java
59
components/device-mgt-iot-raspberrypi/org.wso2.carbon.device.mgt.iot.raspberrypi.mgt.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/raspberrypi/service/RaspberryPiService.java → components/device-mgt-iot-raspberrypi/org.wso2.carbon.device.mgt.iot.raspberrypi.manager.service.impl/src/main/java/org/wso2/carbon/device/mgt/iot/raspberrypi/manager/service/impl/RaspberryPiManagerService.java
0
components/device-mgt-iot-raspberrypi/org.wso2.carbon.device.mgt.iot.raspberrypi.mgt.service.impl/src/main/webapp/META-INF/webapp-classloading.xml → components/device-mgt-iot-raspberrypi/org.wso2.carbon.device.mgt.iot.raspberrypi.manager.service.impl/src/main/webapp/META-INF/webapp-classloading.xml
0
components/device-mgt-iot-raspberrypi/org.wso2.carbon.device.mgt.iot.raspberrypi.mgt.service.impl/src/main/webapp/META-INF/webapp-classloading.xml → components/device-mgt-iot-raspberrypi/org.wso2.carbon.device.mgt.iot.raspberrypi.manager.service.impl/src/main/webapp/META-INF/webapp-classloading.xml
@ -1,31 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.wso2.carbon.device.mgt.iot.raspberrypi.service.exception;
|
||||
|
||||
public class RaspberrypiException extends Exception {
|
||||
private static final long serialVersionUID = 118512086957330189L;
|
||||
|
||||
public RaspberrypiException(String errorMessage) {
|
||||
super(errorMessage);
|
||||
}
|
||||
|
||||
public RaspberrypiException(String errorMessage, Throwable throwable) {
|
||||
super(errorMessage, throwable);
|
||||
}
|
||||
}
|
@ -1,134 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.wso2.carbon.device.mgt.iot.raspberrypi.service.transport;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||
import org.wso2.carbon.device.mgt.common.DeviceManagementException;
|
||||
import org.wso2.carbon.device.mgt.iot.config.server.DeviceManagementConfigurationManager;
|
||||
import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttConfig;
|
||||
import org.wso2.carbon.device.mgt.iot.controlqueue.mqtt.MqttSubscriber;
|
||||
import org.wso2.carbon.device.mgt.iot.raspberrypi.plugin.constants.RaspberrypiConstants;
|
||||
import org.wso2.carbon.device.mgt.iot.raspberrypi.service.util.RaspberrypiServiceUtils;
|
||||
import org.wso2.carbon.device.mgt.iot.sensormgt.SensorDataManager;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Calendar;
|
||||
import java.util.UUID;
|
||||
|
||||
public class RaspberryPiMQTTSubscriber extends MqttSubscriber {
|
||||
private static Log log = LogFactory.getLog(RaspberryPiMQTTSubscriber.class);
|
||||
|
||||
private static final String serverName =
|
||||
DeviceManagementConfigurationManager.getInstance().getDeviceManagementServerInfo().getName();
|
||||
private static final String subscribeTopic =
|
||||
serverName + File.separator + "+" + File.separator + RaspberrypiConstants.DEVICE_TYPE +
|
||||
File.separator + "+" + File.separator + "publisher";
|
||||
|
||||
private static final String iotServerSubscriber = UUID.randomUUID().toString().substring(0, 5);
|
||||
private String mqttEndpoint;
|
||||
|
||||
private RaspberryPiMQTTSubscriber() {
|
||||
super(iotServerSubscriber, RaspberrypiConstants.DEVICE_TYPE,
|
||||
MqttConfig.getInstance().getMqttQueueEndpoint(), subscribeTopic);
|
||||
}
|
||||
|
||||
public void initConnector() {
|
||||
mqttEndpoint = MqttConfig.getInstance().getMqttQueueEndpoint();
|
||||
}
|
||||
|
||||
public void connectAndSubscribe() {
|
||||
try {
|
||||
super.connectAndSubscribe();
|
||||
} catch (DeviceManagementException e) {
|
||||
log.error("Subscription to MQTT Broker at: " + mqttEndpoint + " failed");
|
||||
retryMQTTSubscription();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void postMessageArrived(String topic, MqttMessage mqttMessage) {
|
||||
String ownerAndId = topic.replace("wso2" + File.separator + "iot" + File.separator, "");
|
||||
ownerAndId = ownerAndId.replace(File.separator + RaspberrypiConstants.DEVICE_TYPE + File.separator, ":");
|
||||
ownerAndId = ownerAndId.replace(File.separator + "publisher", "");
|
||||
|
||||
String owner = ownerAndId.split(":")[0];
|
||||
String deviceId = ownerAndId.split(":")[1];
|
||||
String receivedMessage = mqttMessage.toString();
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Received MQTT message for: {OWNER-" + owner + "} & {DEVICE.ID-" + deviceId + "}");
|
||||
log.debug("MQTT: Received Message [" + receivedMessage + "] topic: [" + topic + "]");
|
||||
}
|
||||
|
||||
if (receivedMessage.contains("PUBLISHER")) {
|
||||
float temperature = Float.parseFloat(receivedMessage.split(":")[2]);
|
||||
|
||||
if (!RaspberrypiServiceUtils.publishToDAS(owner, deviceId, temperature)) {
|
||||
log.error("MQTT Subscriber: Publishing data to DAS failed.");
|
||||
}
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("MQTT Subscriber: Published data to DAS successfully.");
|
||||
}
|
||||
|
||||
} else if (receivedMessage.contains("TEMPERATURE")) {
|
||||
String temperatureValue = receivedMessage.split(":")[1];
|
||||
SensorDataManager.getInstance().setSensorRecord(deviceId, RaspberrypiConstants.SENSOR_TEMPERATURE,
|
||||
temperatureValue,
|
||||
Calendar.getInstance().getTimeInMillis());
|
||||
}
|
||||
}
|
||||
|
||||
private void retryMQTTSubscription() {
|
||||
Thread retryToSubscribe = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
while (true) {
|
||||
if (!isConnected()) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Subscriber re-trying to reach MQTT queue....");
|
||||
}
|
||||
|
||||
try {
|
||||
RaspberryPiMQTTSubscriber.super.connectAndSubscribe();
|
||||
} catch (DeviceManagementException e1) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Attempt to re-connect to MQTT-Queue failed");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(5000);
|
||||
} catch (InterruptedException e1) {
|
||||
log.error("MQTT: Thread S;eep Interrupt Exception");
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
retryToSubscribe.setDaemon(true);
|
||||
retryToSubscribe.start();
|
||||
}
|
||||
}
|
||||
|
@ -1,234 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.wso2.carbon.device.mgt.iot.raspberrypi.service.util;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.http.HttpResponse;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.http.concurrent.FutureCallback;
|
||||
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
|
||||
import org.apache.http.impl.nio.client.HttpAsyncClients;
|
||||
import org.wso2.carbon.context.PrivilegedCarbonContext;
|
||||
import org.wso2.carbon.device.mgt.analytics.exception.DataPublisherConfigurationException;
|
||||
import org.wso2.carbon.device.mgt.analytics.service.DeviceAnalyticsService;
|
||||
import org.wso2.carbon.device.mgt.common.DeviceManagementException;
|
||||
import org.wso2.carbon.device.mgt.iot.DeviceController;
|
||||
import org.wso2.carbon.device.mgt.iot.exception.DeviceControllerException;
|
||||
import org.wso2.carbon.device.mgt.iot.raspberrypi.plugin.constants.RaspberrypiConstants;
|
||||
|
||||
import javax.ws.rs.HttpMethod;
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.ProtocolException;
|
||||
import java.net.URL;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
public class RaspberrypiServiceUtils {
|
||||
private static final Log log = LogFactory.getLog(RaspberrypiServiceUtils.class);
|
||||
|
||||
//TODO; replace this tenant domain
|
||||
private static final String SUPER_TENANT = "carbon.super";
|
||||
private static final String TEMPERATURE_STREAM_DEFINITION = "org.wso2.iot.devices.temperature";
|
||||
|
||||
public static String sendCommandViaHTTP(final String deviceHTTPEndpoint, String urlContext,
|
||||
boolean fireAndForgot) throws DeviceManagementException {
|
||||
|
||||
String responseMsg = "";
|
||||
String urlString = RaspberrypiConstants.URL_PREFIX + deviceHTTPEndpoint + urlContext;
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug(urlString);
|
||||
}
|
||||
|
||||
if (!fireAndForgot) {
|
||||
HttpURLConnection httpConnection = getHttpConnection(urlString);
|
||||
|
||||
try {
|
||||
httpConnection.setRequestMethod(HttpMethod.GET);
|
||||
} catch (ProtocolException e) {
|
||||
String errorMsg =
|
||||
"Protocol specific error occurred when trying to set method to GET" +
|
||||
" for:" + urlString;
|
||||
log.error(errorMsg);
|
||||
throw new DeviceManagementException(errorMsg, e);
|
||||
}
|
||||
|
||||
responseMsg = readResponseFromGetRequest(httpConnection);
|
||||
|
||||
} else {
|
||||
CloseableHttpAsyncClient httpclient = null;
|
||||
try {
|
||||
|
||||
httpclient = HttpAsyncClients.createDefault();
|
||||
httpclient.start();
|
||||
HttpGet request = new HttpGet(urlString);
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
Future<HttpResponse> future = httpclient.execute(
|
||||
request, new FutureCallback<HttpResponse>() {
|
||||
@Override
|
||||
public void completed(HttpResponse httpResponse) {
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Exception e) {
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancelled() {
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
latch.await();
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Sync Interrupted");
|
||||
}
|
||||
} finally {
|
||||
try {
|
||||
if (httpclient != null) {
|
||||
httpclient.close();
|
||||
|
||||
}
|
||||
} catch (IOException e) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Failed on close");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return responseMsg;
|
||||
}
|
||||
|
||||
|
||||
public static boolean sendCommandViaMQTT(String deviceOwner, String deviceId, String resource,
|
||||
String state) throws DeviceManagementException {
|
||||
|
||||
boolean result;
|
||||
DeviceController deviceController = new DeviceController();
|
||||
|
||||
try {
|
||||
result = deviceController.publishMqttControl(deviceOwner, RaspberrypiConstants.DEVICE_TYPE, deviceId, resource, state);
|
||||
} catch (DeviceControllerException e) {
|
||||
String errorMsg = "Error whilst trying to publish to MQTT Queue";
|
||||
log.error(errorMsg);
|
||||
throw new DeviceManagementException(errorMsg, e);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/* ---------------------------------------------------------------------------------------
|
||||
Utility methods relevant to creating and sending http requests
|
||||
--------------------------------------------------------------------------------------- */
|
||||
|
||||
/* This methods creates and returns a http connection object */
|
||||
|
||||
public static HttpURLConnection getHttpConnection(String urlString) throws
|
||||
DeviceManagementException {
|
||||
|
||||
URL connectionUrl = null;
|
||||
HttpURLConnection httpConnection;
|
||||
|
||||
try {
|
||||
connectionUrl = new URL(urlString);
|
||||
httpConnection = (HttpURLConnection) connectionUrl.openConnection();
|
||||
} catch (MalformedURLException e) {
|
||||
String errorMsg =
|
||||
"Error occured whilst trying to form HTTP-URL from string: " + urlString;
|
||||
log.error(errorMsg);
|
||||
throw new DeviceManagementException(errorMsg, e);
|
||||
} catch (IOException e) {
|
||||
String errorMsg = "Error occured whilst trying to open a connection to: " +
|
||||
connectionUrl.toString();
|
||||
log.error(errorMsg);
|
||||
throw new DeviceManagementException(errorMsg, e);
|
||||
}
|
||||
|
||||
return httpConnection;
|
||||
}
|
||||
|
||||
/* This methods reads and returns the response from the connection */
|
||||
|
||||
public static String readResponseFromGetRequest(HttpURLConnection httpConnection)
|
||||
throws DeviceManagementException {
|
||||
BufferedReader bufferedReader;
|
||||
try {
|
||||
bufferedReader = new BufferedReader(new InputStreamReader(
|
||||
httpConnection.getInputStream()));
|
||||
} catch (IOException e) {
|
||||
String errorMsg =
|
||||
"There is an issue with connecting the reader to the input stream at: " +
|
||||
httpConnection.getURL();
|
||||
log.error(errorMsg);
|
||||
throw new DeviceManagementException(errorMsg, e);
|
||||
}
|
||||
|
||||
String responseLine;
|
||||
StringBuilder completeResponse = new StringBuilder();
|
||||
|
||||
try {
|
||||
while ((responseLine = bufferedReader.readLine()) != null) {
|
||||
completeResponse.append(responseLine);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
String errorMsg =
|
||||
"Error occured whilst trying read from the connection stream at: " +
|
||||
httpConnection.getURL();
|
||||
log.error(errorMsg);
|
||||
throw new DeviceManagementException(errorMsg, e);
|
||||
}
|
||||
try {
|
||||
bufferedReader.close();
|
||||
} catch (IOException e) {
|
||||
log.error(
|
||||
"Could not succesfully close the bufferedReader to the connection at: " +
|
||||
httpConnection.getURL());
|
||||
}
|
||||
|
||||
return completeResponse.toString();
|
||||
}
|
||||
|
||||
public static boolean publishToDAS(String owner, String deviceId, float temperature) {
|
||||
PrivilegedCarbonContext.startTenantFlow();
|
||||
PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext();
|
||||
ctx.setTenantDomain(SUPER_TENANT, true);
|
||||
DeviceAnalyticsService deviceAnalyticsService = (DeviceAnalyticsService) ctx.getOSGiService(
|
||||
DeviceAnalyticsService.class, null);
|
||||
Object metdaData[] = {owner, RaspberrypiConstants.DEVICE_TYPE, deviceId, System.currentTimeMillis()};
|
||||
Object payloadData[] = {temperature};
|
||||
|
||||
try {
|
||||
deviceAnalyticsService.publishEvent(TEMPERATURE_STREAM_DEFINITION, "1.0.0", metdaData, new Object[0], payloadData);
|
||||
} catch (DataPublisherConfigurationException e) {
|
||||
return false;
|
||||
} finally {
|
||||
PrivilegedCarbonContext.endTenantFlow();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
Loading…
Reference in new issue