From b1d501d7edd9cb7fa6db02c5e430b0ea0c1b8ec8 Mon Sep 17 00:00:00 2001 From: "amalka.subasinghe" Date: Mon, 14 Aug 2023 22:52:06 +0530 Subject: [PATCH 1/2] improved mqtt publisher to support any topic --- .../adapter/mqtt/MQTTEventAdapterFactory.java | 6 ++ .../mqtt/util/MQTTAdapterPublisher.java | 59 ++++++++++--------- .../MQTTBrokerConnectionConfiguration.java | 14 ++++- 3 files changed, 48 insertions(+), 31 deletions(-) diff --git a/components/extensions/cdmf-transport-adapters/output/io.entgra.device.mgt.plugins.output.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/output/adapter/mqtt/MQTTEventAdapterFactory.java b/components/extensions/cdmf-transport-adapters/output/io.entgra.device.mgt.plugins.output.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/output/adapter/mqtt/MQTTEventAdapterFactory.java index dbea53923..6e63d8169 100644 --- a/components/extensions/cdmf-transport-adapters/output/io.entgra.device.mgt.plugins.output.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/output/adapter/mqtt/MQTTEventAdapterFactory.java +++ b/components/extensions/cdmf-transport-adapters/output/io.entgra.device.mgt.plugins.output.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/output/adapter/mqtt/MQTTEventAdapterFactory.java @@ -94,12 +94,18 @@ public class MQTTEventAdapterFactory extends OutputEventAdapterFactory { qos.setOptions(new String[]{"0", "1", "2"}); qos.setDefaultValue("2"); + // set topic + Property topicProperty = new Property(MQTTEventAdapterConstants.ADAPTER_MESSAGE_TOPIC); + topicProperty.setDisplayName(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_MESSAGE_TOPIC)); + topicProperty.setRequired(false); + staticPropertyList.add(brokerUrl); staticPropertyList.add(userName); staticPropertyList.add(scopes); staticPropertyList.add(clearSession); staticPropertyList.add(qos); staticPropertyList.add(password); + staticPropertyList.add(topicProperty); return staticPropertyList; } diff --git a/components/extensions/cdmf-transport-adapters/output/io.entgra.device.mgt.plugins.output.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/output/adapter/mqtt/util/MQTTAdapterPublisher.java b/components/extensions/cdmf-transport-adapters/output/io.entgra.device.mgt.plugins.output.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/output/adapter/mqtt/util/MQTTAdapterPublisher.java index d352cf105..17ea163bb 100644 --- a/components/extensions/cdmf-transport-adapters/output/io.entgra.device.mgt.plugins.output.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/output/adapter/mqtt/util/MQTTAdapterPublisher.java +++ b/components/extensions/cdmf-transport-adapters/output/io.entgra.device.mgt.plugins.output.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/output/adapter/mqtt/util/MQTTAdapterPublisher.java @@ -17,37 +17,28 @@ */ package io.entgra.device.mgt.plugins.output.adapter.mqtt.util; +import io.entgra.device.mgt.core.apimgt.keymgt.extension.DCRResponse; +import io.entgra.device.mgt.core.apimgt.keymgt.extension.TokenRequest; +import io.entgra.device.mgt.core.apimgt.keymgt.extension.TokenResponse; +import io.entgra.device.mgt.core.apimgt.keymgt.extension.exception.BadRequestException; +import io.entgra.device.mgt.core.apimgt.keymgt.extension.exception.KeyMgtException; +import io.entgra.device.mgt.core.apimgt.keymgt.extension.service.KeyMgtService; +import io.entgra.device.mgt.core.apimgt.keymgt.extension.service.KeyMgtServiceImpl; +import io.entgra.device.mgt.core.identity.jwt.client.extension.exception.JWTClientException; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.ssl.Base64; -import org.apache.http.HttpResponse; -import org.apache.http.client.HttpClient; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.ContentType; -import org.apache.http.entity.StringEntity; -import org.apache.http.message.BasicHeader; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; -import org.json.simple.JSONObject; -import org.json.simple.parser.JSONParser; -import org.json.simple.parser.ParseException; -import io.entgra.device.mgt.core.apimgt.keymgt.extension.DCRResponse; -import io.entgra.device.mgt.core.apimgt.keymgt.extension.TokenRequest; -import io.entgra.device.mgt.core.apimgt.keymgt.extension.TokenResponse; -import io.entgra.device.mgt.core.apimgt.keymgt.extension.exception.BadRequestException; -import io.entgra.device.mgt.core.apimgt.keymgt.extension.exception.KeyMgtException; -import io.entgra.device.mgt.core.apimgt.keymgt.extension.service.KeyMgtService; -import io.entgra.device.mgt.core.apimgt.keymgt.extension.service.KeyMgtServiceImpl; +import org.jetbrains.annotations.NotNull; import org.wso2.carbon.context.PrivilegedCarbonContext; import org.wso2.carbon.event.output.adapter.core.exception.ConnectionUnavailableException; import org.wso2.carbon.event.output.adapter.core.exception.OutputEventAdapterException; import org.wso2.carbon.event.output.adapter.core.exception.OutputEventAdapterRuntimeException; -import io.entgra.device.mgt.core.identity.jwt.client.extension.dto.AccessTokenInfo; -import io.entgra.device.mgt.core.identity.jwt.client.extension.exception.JWTClientException; -import io.entgra.device.mgt.core.identity.jwt.client.extension.service.JWTClientManagerService; import org.wso2.carbon.user.api.UserStoreException; /** @@ -153,6 +144,8 @@ public class MQTTAdapterPublisher { String dcrUrlString = this.mqttBrokerConnectionConfiguration.getDcrUrl(); if (dcrUrlString != null && !dcrUrlString.isEmpty()) { + PrivilegedCarbonContext.startTenantFlow(); + PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(tenantDomain, true); try { KeyMgtService keyMgtService = new KeyMgtServiceImpl(); String applicationName = MQTTEventAdapterConstants.APPLICATION_NAME_PREFIX @@ -170,6 +163,8 @@ public class MQTTAdapterPublisher { } catch (KeyMgtException e) { log.error("Failed to create an application.", e); throw new OutputEventAdapterRuntimeException("Failed to create an application.", e); + } finally { + PrivilegedCarbonContext.endTenantFlow(); } } throw new OutputEventAdapterRuntimeException("Invalid configuration for mqtt publisher"); @@ -177,27 +172,33 @@ public class MQTTAdapterPublisher { private String getToken(String clientId, String clientSecret) throws UserStoreException, JWTClientException { - PrivilegedCarbonContext.startTenantFlow(); - PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(tenantDomain, true); try { - String scopes = mqttBrokerConnectionConfiguration.getScopes(); - scopes += " perm:topic:pub:" + tenantDomain + ":+:+:operation"; - - TokenRequest tokenRequest = new TokenRequest(clientId, clientSecret, - null, scopes.toString(), "client_credentials", null, - null, null, null, Integer.MAX_VALUE); + TokenRequest tokenRequest = getTokenRequest(clientId, clientSecret); KeyMgtService keyMgtService = new KeyMgtServiceImpl(); TokenResponse tokenResponse = keyMgtService.generateAccessToken(tokenRequest); return tokenResponse.getAccessToken(); } catch (KeyMgtException | BadRequestException e) { log.error("Error while generating access token", e); - } finally { - PrivilegedCarbonContext.endTenantFlow(); } return null; } + @NotNull + private TokenRequest getTokenRequest(String clientId, String clientSecret) { + String scopes = mqttBrokerConnectionConfiguration.getScopes(); + scopes += " perm:topic:pub:" + tenantDomain + ":+:+:operation"; + + if (!StringUtils.isEmpty(mqttBrokerConnectionConfiguration.getTopic())) { + scopes += " perm:topic:pub:" + mqttBrokerConnectionConfiguration.getTopic().replace("/",":"); + } + + TokenRequest tokenRequest = new TokenRequest(clientId, clientSecret, + null, scopes.toString(), "client_credentials", null, + null, null, null, Integer.MAX_VALUE); + return tokenRequest; + } + private String getBase64Encode(String key, String value) { return new String(Base64.encodeBase64((key + ":" + value).getBytes())); } diff --git a/components/extensions/cdmf-transport-adapters/output/io.entgra.device.mgt.plugins.output.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/output/adapter/mqtt/util/MQTTBrokerConnectionConfiguration.java b/components/extensions/cdmf-transport-adapters/output/io.entgra.device.mgt.plugins.output.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/output/adapter/mqtt/util/MQTTBrokerConnectionConfiguration.java index 4796483ac..576f111dd 100644 --- a/components/extensions/cdmf-transport-adapters/output/io.entgra.device.mgt.plugins.output.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/output/adapter/mqtt/util/MQTTBrokerConnectionConfiguration.java +++ b/components/extensions/cdmf-transport-adapters/output/io.entgra.device.mgt.plugins.output.adapter.mqtt/src/main/java/io/entgra/device/mgt/plugins/output/adapter/mqtt/util/MQTTBrokerConnectionConfiguration.java @@ -17,8 +17,8 @@ */ package io.entgra.device.mgt.plugins.output.adapter.mqtt.util; +import org.apache.commons.lang3.StringUtils; import org.wso2.carbon.event.output.adapter.core.OutputEventAdapterConfiguration; -import org.wso2.carbon.event.output.adapter.core.exception.OutputEventAdapterException; import java.util.Map; @@ -36,6 +36,8 @@ public class MQTTBrokerConnectionConfiguration { private boolean globalCredentailSet; private int qos; + private String topic; + public String getTokenUrl() { return tokenUrl; } @@ -79,6 +81,11 @@ public class MQTTBrokerConnectionConfiguration { public int getQos() { return qos; } + + public String getTopic() { + return topic; + } + public MQTTBrokerConnectionConfiguration(OutputEventAdapterConfiguration eventAdapterConfiguration, Map globalProperties) { adapterName = eventAdapterConfiguration.getName(); @@ -123,7 +130,10 @@ public class MQTTBrokerConnectionConfiguration { this.qos = Integer.parseInt(qosVal); } - + String topic = eventAdapterConfiguration.getStaticProperties().get(MQTTEventAdapterConstants.ADAPTER_MESSAGE_TOPIC); + if (!StringUtils.isEmpty(topic)) { + this.topic = topic; + } } } From 8188fc6b8bf880559e19482bb663a743607227af Mon Sep 17 00:00:00 2001 From: "amalka.subasinghe" Date: Fri, 18 Aug 2023 09:48:53 +0530 Subject: [PATCH 2/2] added logger --- .../pom.xml | 5 ++++ .../mgt/plugins/emqx/exhook/ExServer.java | 25 +++++++++---------- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/pom.xml b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/pom.xml index 12c1253b8..be3202f4a 100644 --- a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/pom.xml +++ b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/pom.xml @@ -74,6 +74,11 @@ io.entgra.device.mgt.core.device.mgt.core provided + + org.ops4j.pax.logging + pax-logging-api + provided + diff --git a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/ExServer.java b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/ExServer.java index 0e2233ee6..0d5d3d3a7 100644 --- a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/ExServer.java +++ b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/ExServer.java @@ -23,6 +23,12 @@ import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.protobuf.ByteString; import com.google.protobuf.GeneratedMessageV3; +import io.entgra.device.mgt.core.device.mgt.common.DeviceIdentifier; +import io.entgra.device.mgt.core.device.mgt.common.EnrolmentInfo; +import io.entgra.device.mgt.core.device.mgt.common.exceptions.DeviceManagementException; +import io.entgra.device.mgt.core.device.mgt.core.config.DeviceConfigurationManager; +import io.entgra.device.mgt.core.device.mgt.core.config.keymanager.KeyManagerConfigurations; +import io.entgra.device.mgt.core.device.mgt.core.service.DeviceManagementProviderService; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; @@ -34,15 +40,8 @@ import org.apache.http.HttpStatus; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; -import io.entgra.device.mgt.core.device.mgt.core.config.DeviceConfigurationManager; -import io.entgra.device.mgt.core.device.mgt.core.config.keymanager.KeyManagerConfigurations; -import io.entgra.device.mgt.core.device.mgt.core.internal.DeviceManagementDataHolder; -import io.entgra.device.mgt.core.device.mgt.core.service.DeviceManagementProviderService; -import io.entgra.device.mgt.core.device.mgt.common.EnrolmentInfo; -import io.entgra.device.mgt.core.device.mgt.common.exceptions.DeviceManagementException; -import io.entgra.device.mgt.core.device.mgt.common.DeviceIdentifier; -import org.wso2.carbon.context.CarbonContext; import org.wso2.carbon.context.PrivilegedCarbonContext; + import java.io.IOException; import java.util.*; import java.util.concurrent.TimeUnit; @@ -110,7 +109,7 @@ public class ExServer { public void DEBUG(String fn, Object req) { - System.out.printf(fn + ", request: " + req); + logger.debug(fn + ", request: " + req); } @Override @@ -493,8 +492,7 @@ public class ExServer { @Override public void onMessagePublish(MessagePublishRequest request, StreamObserver responseObserver) { - DEBUG("onMessagePublish", request); - + logger.info("onMessagePublish"); ByteString bstr = ByteString.copyFromUtf8("hardcode payload by exhook-svr-java :)"); Message nmsg = Message.newBuilder() @@ -538,7 +536,7 @@ public class ExServer { @Override public void onMessageDelivered(MessageDeliveredRequest request, StreamObserver responseObserver) { - DEBUG("onMessageDelivered", request); + logger.info("onMessageDelivered"); EmptySuccess reply = EmptySuccess.newBuilder().build(); responseObserver.onNext(reply); responseObserver.onCompleted(); @@ -554,10 +552,11 @@ public class ExServer { @Override public void onMessageDropped(MessageDroppedRequest request, StreamObserver responseObserver) { - DEBUG("onMessageDropped", request); + logger.info("onMessageDropped ---------------------------------------------------------------"); EmptySuccess reply = EmptySuccess.newBuilder().build(); responseObserver.onNext(reply); responseObserver.onCompleted(); + }