diff --git a/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/mqtt/PolicyPush.java b/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/mqtt/PolicyPush.java new file mode 100644 index 0000000000..9fc4fddd9f --- /dev/null +++ b/components/device-mgt-iot/org.wso2.carbon.device.mgt.iot/src/main/java/org/wso2/carbon/device/mgt/iot/mqtt/PolicyPush.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.carbon.device.mgt.iot.mqtt; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; + +public class PolicyPush { + + private static final Log log = LogFactory.getLog(PolicyPush.class); + + public boolean pushToMQTT(String topic, String content, String broker, String clientId) { + + byte qos = 2; + MemoryPersistence persistence = new MemoryPersistence(); + + try { + MqttClient me = new MqttClient(broker, clientId, persistence); + MqttConnectOptions connOpts = new MqttConnectOptions(); + connOpts.setCleanSession(true); + if (log.isDebugEnabled()) { + log.debug("Connecting to broker: " + broker); + } + me.connect(connOpts); + if (log.isDebugEnabled()) { + log.debug("Connected"); + log.debug("Publishing message: " + content); + } + MqttMessage message = new MqttMessage(content.getBytes()); + message.setQos(qos); + me.publish(topic, message); + if (log.isDebugEnabled()) { + log.debug("Message published"); + } + me.disconnect(); + if (log.isDebugEnabled()) { + log.debug("Disconnected"); + } + return true; + } catch (MqttException ex) { + log.error("Error occurred when trying to publish to MQTT Queue", ex); + return false; + } + } +}