Merge pull request 'improved mqtt publisher to support any topic' (#29) from amalka.subasinghe/device-mgt-plugins:master into master

Reviewed-on: community/device-mgt-plugins#29
apim420^2^2
commit 8b6b3dbc7b

@ -94,12 +94,18 @@ public class MQTTEventAdapterFactory extends OutputEventAdapterFactory {
qos.setOptions(new String[]{"0", "1", "2"}); qos.setOptions(new String[]{"0", "1", "2"});
qos.setDefaultValue("2"); qos.setDefaultValue("2");
// set topic
Property topicProperty = new Property(MQTTEventAdapterConstants.ADAPTER_MESSAGE_TOPIC);
topicProperty.setDisplayName(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_MESSAGE_TOPIC));
topicProperty.setRequired(false);
staticPropertyList.add(brokerUrl); staticPropertyList.add(brokerUrl);
staticPropertyList.add(userName); staticPropertyList.add(userName);
staticPropertyList.add(scopes); staticPropertyList.add(scopes);
staticPropertyList.add(clearSession); staticPropertyList.add(clearSession);
staticPropertyList.add(qos); staticPropertyList.add(qos);
staticPropertyList.add(password); staticPropertyList.add(password);
staticPropertyList.add(topicProperty);
return staticPropertyList; return staticPropertyList;
} }

@ -17,37 +17,28 @@
*/ */
package io.entgra.device.mgt.plugins.output.adapter.mqtt.util; package io.entgra.device.mgt.plugins.output.adapter.mqtt.util;
import io.entgra.device.mgt.core.apimgt.keymgt.extension.DCRResponse;
import io.entgra.device.mgt.core.apimgt.keymgt.extension.TokenRequest;
import io.entgra.device.mgt.core.apimgt.keymgt.extension.TokenResponse;
import io.entgra.device.mgt.core.apimgt.keymgt.extension.exception.BadRequestException;
import io.entgra.device.mgt.core.apimgt.keymgt.extension.exception.KeyMgtException;
import io.entgra.device.mgt.core.apimgt.keymgt.extension.service.KeyMgtService;
import io.entgra.device.mgt.core.apimgt.keymgt.extension.service.KeyMgtServiceImpl;
import io.entgra.device.mgt.core.identity.jwt.client.extension.exception.JWTClientException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.commons.ssl.Base64; import org.apache.commons.ssl.Base64;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.message.BasicHeader;
import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.json.simple.JSONObject; import org.jetbrains.annotations.NotNull;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import io.entgra.device.mgt.core.apimgt.keymgt.extension.DCRResponse;
import io.entgra.device.mgt.core.apimgt.keymgt.extension.TokenRequest;
import io.entgra.device.mgt.core.apimgt.keymgt.extension.TokenResponse;
import io.entgra.device.mgt.core.apimgt.keymgt.extension.exception.BadRequestException;
import io.entgra.device.mgt.core.apimgt.keymgt.extension.exception.KeyMgtException;
import io.entgra.device.mgt.core.apimgt.keymgt.extension.service.KeyMgtService;
import io.entgra.device.mgt.core.apimgt.keymgt.extension.service.KeyMgtServiceImpl;
import org.wso2.carbon.context.PrivilegedCarbonContext; import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.event.output.adapter.core.exception.ConnectionUnavailableException; import org.wso2.carbon.event.output.adapter.core.exception.ConnectionUnavailableException;
import org.wso2.carbon.event.output.adapter.core.exception.OutputEventAdapterException; import org.wso2.carbon.event.output.adapter.core.exception.OutputEventAdapterException;
import org.wso2.carbon.event.output.adapter.core.exception.OutputEventAdapterRuntimeException; import org.wso2.carbon.event.output.adapter.core.exception.OutputEventAdapterRuntimeException;
import io.entgra.device.mgt.core.identity.jwt.client.extension.dto.AccessTokenInfo;
import io.entgra.device.mgt.core.identity.jwt.client.extension.exception.JWTClientException;
import io.entgra.device.mgt.core.identity.jwt.client.extension.service.JWTClientManagerService;
import org.wso2.carbon.user.api.UserStoreException; import org.wso2.carbon.user.api.UserStoreException;
/** /**
@ -153,6 +144,8 @@ public class MQTTAdapterPublisher {
String dcrUrlString = this.mqttBrokerConnectionConfiguration.getDcrUrl(); String dcrUrlString = this.mqttBrokerConnectionConfiguration.getDcrUrl();
if (dcrUrlString != null && !dcrUrlString.isEmpty()) { if (dcrUrlString != null && !dcrUrlString.isEmpty()) {
PrivilegedCarbonContext.startTenantFlow();
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(tenantDomain, true);
try { try {
KeyMgtService keyMgtService = new KeyMgtServiceImpl(); KeyMgtService keyMgtService = new KeyMgtServiceImpl();
String applicationName = MQTTEventAdapterConstants.APPLICATION_NAME_PREFIX String applicationName = MQTTEventAdapterConstants.APPLICATION_NAME_PREFIX
@ -170,6 +163,8 @@ public class MQTTAdapterPublisher {
} catch (KeyMgtException e) { } catch (KeyMgtException e) {
log.error("Failed to create an application.", e); log.error("Failed to create an application.", e);
throw new OutputEventAdapterRuntimeException("Failed to create an application.", e); throw new OutputEventAdapterRuntimeException("Failed to create an application.", e);
} finally {
PrivilegedCarbonContext.endTenantFlow();
} }
} }
throw new OutputEventAdapterRuntimeException("Invalid configuration for mqtt publisher"); throw new OutputEventAdapterRuntimeException("Invalid configuration for mqtt publisher");
@ -177,27 +172,33 @@ public class MQTTAdapterPublisher {
private String getToken(String clientId, String clientSecret) private String getToken(String clientId, String clientSecret)
throws UserStoreException, JWTClientException { throws UserStoreException, JWTClientException {
PrivilegedCarbonContext.startTenantFlow();
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(tenantDomain, true);
try { try {
String scopes = mqttBrokerConnectionConfiguration.getScopes(); TokenRequest tokenRequest = getTokenRequest(clientId, clientSecret);
scopes += " perm:topic:pub:" + tenantDomain + ":+:+:operation";
TokenRequest tokenRequest = new TokenRequest(clientId, clientSecret,
null, scopes.toString(), "client_credentials", null,
null, null, null, Integer.MAX_VALUE);
KeyMgtService keyMgtService = new KeyMgtServiceImpl(); KeyMgtService keyMgtService = new KeyMgtServiceImpl();
TokenResponse tokenResponse = keyMgtService.generateAccessToken(tokenRequest); TokenResponse tokenResponse = keyMgtService.generateAccessToken(tokenRequest);
return tokenResponse.getAccessToken(); return tokenResponse.getAccessToken();
} catch (KeyMgtException | BadRequestException e) { } catch (KeyMgtException | BadRequestException e) {
log.error("Error while generating access token", e); log.error("Error while generating access token", e);
} finally {
PrivilegedCarbonContext.endTenantFlow();
} }
return null; return null;
} }
@NotNull
private TokenRequest getTokenRequest(String clientId, String clientSecret) {
String scopes = mqttBrokerConnectionConfiguration.getScopes();
scopes += " perm:topic:pub:" + tenantDomain + ":+:+:operation";
if (!StringUtils.isEmpty(mqttBrokerConnectionConfiguration.getTopic())) {
scopes += " perm:topic:pub:" + mqttBrokerConnectionConfiguration.getTopic().replace("/",":");
}
TokenRequest tokenRequest = new TokenRequest(clientId, clientSecret,
null, scopes.toString(), "client_credentials", null,
null, null, null, Integer.MAX_VALUE);
return tokenRequest;
}
private String getBase64Encode(String key, String value) { private String getBase64Encode(String key, String value) {
return new String(Base64.encodeBase64((key + ":" + value).getBytes())); return new String(Base64.encodeBase64((key + ":" + value).getBytes()));
} }

@ -17,8 +17,8 @@
*/ */
package io.entgra.device.mgt.plugins.output.adapter.mqtt.util; package io.entgra.device.mgt.plugins.output.adapter.mqtt.util;
import org.apache.commons.lang3.StringUtils;
import org.wso2.carbon.event.output.adapter.core.OutputEventAdapterConfiguration; import org.wso2.carbon.event.output.adapter.core.OutputEventAdapterConfiguration;
import org.wso2.carbon.event.output.adapter.core.exception.OutputEventAdapterException;
import java.util.Map; import java.util.Map;
@ -36,6 +36,8 @@ public class MQTTBrokerConnectionConfiguration {
private boolean globalCredentailSet; private boolean globalCredentailSet;
private int qos; private int qos;
private String topic;
public String getTokenUrl() { public String getTokenUrl() {
return tokenUrl; return tokenUrl;
} }
@ -79,6 +81,11 @@ public class MQTTBrokerConnectionConfiguration {
public int getQos() { public int getQos() {
return qos; return qos;
} }
public String getTopic() {
return topic;
}
public MQTTBrokerConnectionConfiguration(OutputEventAdapterConfiguration eventAdapterConfiguration, public MQTTBrokerConnectionConfiguration(OutputEventAdapterConfiguration eventAdapterConfiguration,
Map<String, String> globalProperties) { Map<String, String> globalProperties) {
adapterName = eventAdapterConfiguration.getName(); adapterName = eventAdapterConfiguration.getName();
@ -123,7 +130,10 @@ public class MQTTBrokerConnectionConfiguration {
this.qos = Integer.parseInt(qosVal); this.qos = Integer.parseInt(qosVal);
} }
String topic = eventAdapterConfiguration.getStaticProperties().get(MQTTEventAdapterConstants.ADAPTER_MESSAGE_TOPIC);
if (!StringUtils.isEmpty(topic)) {
this.topic = topic;
}
} }
} }

@ -74,6 +74,11 @@
<artifactId>io.entgra.device.mgt.core.device.mgt.core</artifactId> <artifactId>io.entgra.device.mgt.core.device.mgt.core</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>org.ops4j.pax.logging</groupId>
<artifactId>pax-logging-api</artifactId>
<scope>provided</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>
<extensions> <extensions>

@ -23,6 +23,12 @@ import com.google.gson.JsonObject;
import com.google.gson.JsonParser; import com.google.gson.JsonParser;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.google.protobuf.GeneratedMessageV3; import com.google.protobuf.GeneratedMessageV3;
import io.entgra.device.mgt.core.device.mgt.common.DeviceIdentifier;
import io.entgra.device.mgt.core.device.mgt.common.EnrolmentInfo;
import io.entgra.device.mgt.core.device.mgt.common.exceptions.DeviceManagementException;
import io.entgra.device.mgt.core.device.mgt.core.config.DeviceConfigurationManager;
import io.entgra.device.mgt.core.device.mgt.core.config.keymanager.KeyManagerConfigurations;
import io.entgra.device.mgt.core.device.mgt.core.service.DeviceManagementProviderService;
import io.grpc.Server; import io.grpc.Server;
import io.grpc.ServerBuilder; import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
@ -34,15 +40,8 @@ import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType; import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity; import org.apache.http.entity.StringEntity;
import io.entgra.device.mgt.core.device.mgt.core.config.DeviceConfigurationManager;
import io.entgra.device.mgt.core.device.mgt.core.config.keymanager.KeyManagerConfigurations;
import io.entgra.device.mgt.core.device.mgt.core.internal.DeviceManagementDataHolder;
import io.entgra.device.mgt.core.device.mgt.core.service.DeviceManagementProviderService;
import io.entgra.device.mgt.core.device.mgt.common.EnrolmentInfo;
import io.entgra.device.mgt.core.device.mgt.common.exceptions.DeviceManagementException;
import io.entgra.device.mgt.core.device.mgt.common.DeviceIdentifier;
import org.wso2.carbon.context.CarbonContext;
import org.wso2.carbon.context.PrivilegedCarbonContext; import org.wso2.carbon.context.PrivilegedCarbonContext;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -110,7 +109,7 @@ public class ExServer {
public void DEBUG(String fn, Object req) { public void DEBUG(String fn, Object req) {
System.out.printf(fn + ", request: " + req); logger.debug(fn + ", request: " + req);
} }
@Override @Override
@ -493,8 +492,7 @@ public class ExServer {
@Override @Override
public void onMessagePublish(MessagePublishRequest request, StreamObserver<ValuedResponse> responseObserver) { public void onMessagePublish(MessagePublishRequest request, StreamObserver<ValuedResponse> responseObserver) {
DEBUG("onMessagePublish", request); logger.info("onMessagePublish");
ByteString bstr = ByteString.copyFromUtf8("hardcode payload by exhook-svr-java :)"); ByteString bstr = ByteString.copyFromUtf8("hardcode payload by exhook-svr-java :)");
Message nmsg = Message.newBuilder() Message nmsg = Message.newBuilder()
@ -538,7 +536,7 @@ public class ExServer {
@Override @Override
public void onMessageDelivered(MessageDeliveredRequest request, StreamObserver<EmptySuccess> responseObserver) { public void onMessageDelivered(MessageDeliveredRequest request, StreamObserver<EmptySuccess> responseObserver) {
DEBUG("onMessageDelivered", request); logger.info("onMessageDelivered");
EmptySuccess reply = EmptySuccess.newBuilder().build(); EmptySuccess reply = EmptySuccess.newBuilder().build();
responseObserver.onNext(reply); responseObserver.onNext(reply);
responseObserver.onCompleted(); responseObserver.onCompleted();
@ -554,10 +552,11 @@ public class ExServer {
@Override @Override
public void onMessageDropped(MessageDroppedRequest request, StreamObserver<EmptySuccess> responseObserver) { public void onMessageDropped(MessageDroppedRequest request, StreamObserver<EmptySuccess> responseObserver) {
DEBUG("onMessageDropped", request); logger.info("onMessageDropped ---------------------------------------------------------------");
EmptySuccess reply = EmptySuccess.newBuilder().build(); EmptySuccess reply = EmptySuccess.newBuilder().build();
responseObserver.onNext(reply); responseObserver.onNext(reply);
responseObserver.onCompleted(); responseObserver.onCompleted();
} }

Loading…
Cancel
Save