From 11785fa4bbc6aeb84fc2f3a851e8581f61df7fd4 Mon Sep 17 00:00:00 2001 From: Amalka Subasinghe Date: Sat, 18 Mar 2023 00:15:45 +0530 Subject: [PATCH 1/4] emqx exhook and initializer --- .../pom.xml | 130 +++++ .../mgt/plugins/emqx/exhook/ExServer.java | 477 ++++++++++++++++++ .../plugins/emqx/exhook/HandlerConstants.java | 28 + .../mgt/plugins/emqx/exhook/HandlerUtil.java | 280 ++++++++++ .../plugins/emqx/exhook/ProxyResponse.java | 63 +++ .../src/main/proto/exhook.proto | 434 ++++++++++++++++ .../pom.xml | 111 ++++ .../initializer/EmqxExhookDataHolder.java | 26 + .../EmqxExhookServiceComponent.java | 62 +++ .../internal/EmqxExhookInitializer.java | 71 +++ components/extensions/emqx-extensions/pom.xml | 39 ++ components/extensions/pom.xml | 1 + pom.xml | 54 +- 13 files changed, 1775 insertions(+), 1 deletion(-) create mode 100644 components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/pom.xml create mode 100644 components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/ExServer.java create mode 100644 components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/HandlerConstants.java create mode 100644 components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/HandlerUtil.java create mode 100644 components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/ProxyResponse.java create mode 100644 components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/proto/exhook.proto create mode 100644 components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/pom.xml create mode 100644 components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/EmqxExhookDataHolder.java create mode 100644 components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/EmqxExhookServiceComponent.java create mode 100644 components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/internal/EmqxExhookInitializer.java create mode 100644 components/extensions/emqx-extensions/pom.xml diff --git a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/pom.xml b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/pom.xml new file mode 100644 index 0000000000..8b80bc0c21 --- /dev/null +++ b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/pom.xml @@ -0,0 +1,130 @@ + + + + + + org.wso2.carbon.devicemgt-plugins + emqx-extensions + 6.0.13-SNAPSHOT + ../pom.xml + + + 4.0.0 + io.entgra.device.mgt.plugins.emqx.exhook + jar + EMQX Extensions - Extension Hook + + + + + io.grpc + grpc-netty-shaded + + + io.grpc + grpc-protobuf + + + io.grpc + grpc-stub + + + org.apache.tomcat + annotations-api + provided + + + org.apache.httpcomponents + httpclient + provided + + + com.google.code.gson + gson + provided + + + org.wso2.carbon + org.wso2.carbon.core + provided + + + commons-lang.wso2 + commons-lang + provided + + + org.wso2.carbon.devicemgt + org.wso2.carbon.device.mgt.core + provided + + + + + + kr.motd.maven + os-maven-plugin + 1.6.2 + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.6.1 + + com.google.protobuf:protoc:3.12.0:exe:${os.detected.classifier} + grpc-java + io.grpc:protoc-gen-grpc-java:1.36.0:exe:${os.detected.classifier} + + + + + compile + compile-custom + + + + + + org.apache.maven.plugins + maven-assembly-plugin + + + + io.entgra.device.mgt.plugins.emqx.exhook.ExServer + + + + jar-with-dependencies + + + + + assemble-all + package + + single + + + + + + + \ No newline at end of file diff --git a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/ExServer.java b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/ExServer.java new file mode 100644 index 0000000000..7fe87ca9bc --- /dev/null +++ b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/ExServer.java @@ -0,0 +1,477 @@ +/* + * Copyright (c) 2023, Entgra (Pvt) Ltd. (http://www.entgra.io) All Rights Reserved. + * + * Entgra (Pvt) Ltd. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.entgra.device.mgt.plugins.emqx.exhook; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.protobuf.ByteString; +import com.google.protobuf.GeneratedMessageV3; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.stub.StreamObserver; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.http.HttpHeaders; +import org.apache.http.HttpStatus; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.wso2.carbon.device.mgt.core.config.DeviceConfigurationManager; +import org.wso2.carbon.device.mgt.core.config.keymanager.KeyManagerConfigurations; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.TimeUnit; + +public class ExServer { + private static final Log logger = LogFactory.getLog(ExServer.class.getName()); + + private static Map accessTokenMap = new HashMap<>(); + private static Map authorizedScopeMap = new HashMap<>(); + private Server server; + + public ExServer() { + } + + public void start() throws IOException { + /* The port on which the server should run */ + int port = 9000; + + server = ServerBuilder.forPort(port) + .addService(new HookProviderImpl()) + .build() + .start(); + logger.info("Server started, listening on " + port); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + // Use stderr here since the logger may have been reset by its JVM shutdown hook. + System.err.println("*** shutting down gRPC server since JVM is shutting down"); + try { + ExServer.this.stop(); + } catch (InterruptedException e) { + e.printStackTrace(System.err); + } + System.err.println("*** server shut down"); + } + }); + } + + public void stop() throws InterruptedException { + if (server != null) { + server.shutdown().awaitTermination(30, TimeUnit.SECONDS); + } + } + + /** + * Await termination on the main thread since the grpc library uses daemon threads. + */ + public void blockUntilShutdown() throws InterruptedException { + if (server != null) { + server.awaitTermination(); + } + } + + /** + * Main launches the server from the command line. + */ + public static void main(String[] args) throws IOException, InterruptedException { + final ExServer server = new ExServer(); + server.start(); + server.blockUntilShutdown(); + } + + static class HookProviderImpl extends HookProviderGrpc.HookProviderImplBase { + + + + public void DEBUG(String fn, Object req) { + System.out.printf(fn + ", request: " + req); + } + + @Override + public void onProviderLoaded(ProviderLoadedRequest request, StreamObserver responseObserver) { + DEBUG("onProviderLoaded", request); + HookSpec[] specs = { + HookSpec.newBuilder().setName("client.connect").build(), + HookSpec.newBuilder().setName("client.connack").build(), + HookSpec.newBuilder().setName("client.connected").build(), + HookSpec.newBuilder().setName("client.disconnected").build(), + HookSpec.newBuilder().setName("client.authenticate").build(), + HookSpec.newBuilder().setName("client.check_acl").build(), + HookSpec.newBuilder().setName("client.subscribe").build(), + HookSpec.newBuilder().setName("client.unsubscribe").build(), + + HookSpec.newBuilder().setName("session.created").build(), + HookSpec.newBuilder().setName("session.subscribed").build(), + HookSpec.newBuilder().setName("session.unsubscribed").build(), + HookSpec.newBuilder().setName("session.resumed").build(), + HookSpec.newBuilder().setName("session.discarded").build(), + HookSpec.newBuilder().setName("session.takeovered").build(), + HookSpec.newBuilder().setName("session.terminated").build(), + + HookSpec.newBuilder().setName("message.publish").build(), + HookSpec.newBuilder().setName("message.delivered").build(), + HookSpec.newBuilder().setName("message.acked").build(), + HookSpec.newBuilder().setName("message.dropped").build() + }; + LoadedResponse reply = LoadedResponse.newBuilder().addAllHooks(Arrays.asList(specs)).build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + + @Override + public void onProviderUnloaded(ProviderUnloadedRequest request, StreamObserver responseObserver) { + DEBUG("onProviderUnloaded", request); + EmptySuccess reply = EmptySuccess.newBuilder().build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + + @Override + public void onClientConnect(ClientConnectRequest request, StreamObserver responseObserver) { + logger.info("onClientConnect -----------------------------"); + DEBUG("onClientConnect", request); + EmptySuccess reply = EmptySuccess.newBuilder().build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + + @Override + public void onClientConnack(ClientConnackRequest request, StreamObserver responseObserver) { + DEBUG("onClientConnack", request); + EmptySuccess reply = EmptySuccess.newBuilder().build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + + @Override + public void onClientConnected(ClientConnectedRequest request, StreamObserver responseObserver) { + DEBUG("onClientConnected", request); + EmptySuccess reply = EmptySuccess.newBuilder().build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + + @Override + public void onClientDisconnected(ClientDisconnectedRequest request, StreamObserver responseObserver) { + logger.info("onClientDisconnected -----------------------------"); + DEBUG("onClientDisconnected", request); + EmptySuccess reply = EmptySuccess.newBuilder().build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + + @Override + public void onClientAuthenticate(ClientAuthenticateRequest request, StreamObserver responseObserver) { + DEBUG("onClientAuthenticate", request); + + if (!StringUtils.isEmpty(request.getClientinfo().getUsername()) && + !StringUtils.isEmpty(request.getClientinfo().getPassword())) { + + DEBUG("on access token passes", request); + try { + String accessToken = request.getClientinfo().getUsername() + "-" + request.getClientinfo().getPassword(); + KeyManagerConfigurations keyManagerConfig = DeviceConfigurationManager.getInstance() + .getDeviceManagementConfig().getKeyManagerConfigurations(); + + HttpPost tokenEndpoint = new HttpPost( + keyManagerConfig.getServerUrl() + HandlerConstants.INTROSPECT_ENDPOINT); + tokenEndpoint.setHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_FORM_URLENCODED.toString()); + tokenEndpoint.setHeader(HttpHeaders.AUTHORIZATION, HandlerConstants.BASIC + Base64.getEncoder() + .encodeToString((keyManagerConfig.getAdminUsername() + HandlerConstants.COLON + + keyManagerConfig.getAdminPassword()).getBytes())); + StringEntity tokenEPPayload = new StringEntity("token=" + accessToken, + ContentType.APPLICATION_FORM_URLENCODED); + tokenEndpoint.setEntity(tokenEPPayload); + ProxyResponse tokenStatus = HandlerUtil.execute(tokenEndpoint); + + if (tokenStatus.getExecutorResponse().contains(HandlerConstants.EXECUTOR_EXCEPTION_PREFIX)) { + if (tokenStatus.getCode() == HttpStatus.SC_UNAUTHORIZED) { + // return with error + logger.error("Unauthorized"); + responseObserver.onError(new Exception("unauthorized")); + return; + } else { + // return with error + logger.error("error occurred while checking access token"); + responseObserver.onError(new Exception("error occurred while checking access token")); + return; + } + } + + String tokenData = tokenStatus.getData(); + if (tokenData == null) { + // return with error + logger.error("invalid token data is received"); + responseObserver.onError(new Exception("invalid token data is received")); + return; + } + JsonParser jsonParser = new JsonParser(); + JsonElement jTokenResult = jsonParser.parse(tokenData); + if (jTokenResult.isJsonObject()) { + JsonObject jTokenResultAsJsonObject = jTokenResult.getAsJsonObject(); + if (!jTokenResultAsJsonObject.get("active").getAsBoolean()) { + logger.error("access token is expired"); + responseObserver.onError(new Exception("access token is expired")); + return; + } + // success + accessTokenMap.put(request.getClientinfo().getClientid(), accessToken); + authorizedScopeMap.put(accessToken, jTokenResultAsJsonObject.get("scope").getAsString()); + logger.info("authenticated"); + ValuedResponse reply = ValuedResponse.newBuilder() + .setBoolResult(true) + .setType(ValuedResponse.ResponsedType.STOP_AND_RETURN) + .build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } +// } else { +// ValuedResponse reply = ValuedResponse.newBuilder() +// .setBoolResult(true) +// .setType(ValuedResponse.ResponsedType.STOP_AND_RETURN) +// .build(); +// responseObserver.onNext(reply); +// responseObserver.onCompleted(); + } + } + + @Override + public void onClientCheckAcl(ClientCheckAclRequest request, StreamObserver responseObserver) { + DEBUG("onClientCheckAcl", request); + /* + carbon.super/deviceType/deviceId + data/carbon.super/deviceType/deviceId + data/carbonsuper/deviceType/deviceId + republished/deviceType + */ + if (!StringUtils.isEmpty(request.getClientinfo().getUsername()) && + StringUtils.isEmpty(request.getClientinfo().getPassword())) { + //todo: check token validity + String accessToken = accessTokenMap.get(request.getClientinfo().getClientid()); + if (StringUtils.isEmpty(accessToken) || !accessToken.startsWith(request.getClientinfo().getUsername())) { + logger.info("Valid access token not found"); + responseObserver.onError(new Exception("not authorized")); + } + + String authorizedScopeList = authorizedScopeMap.get(accessToken); + String[] scopeArray = authorizedScopeList.split(" "); + List scopeList = Arrays.asList(scopeArray); + boolean isFound = false; + + String tempScope = null; + if (request.getType().equals(ClientCheckAclRequest.AclReqType.SUBSCRIBE) || + request.getType().equals(ClientCheckAclRequest.AclReqType.PUBLISH)) { + + String requestTopic = request.getTopic(); + + if (requestTopic.endsWith("/#")) { + requestTopic = requestTopic.substring(0, requestTopic.indexOf("/#")); + } + + // replace / with : + requestTopic = requestTopic.replace("/", ":"); + + if (request.getType().equals(ClientCheckAclRequest.AclReqType.SUBSCRIBE)) { + tempScope = "perm:topic:sub:" + requestTopic; + } + if (request.getType().equals(ClientCheckAclRequest.AclReqType.PUBLISH)) { + tempScope = "perm:topic:pub:" + requestTopic; + } + + if (scopeList.contains(tempScope)) { + isFound = true; + } + + if (isFound) { + ValuedResponse reply = ValuedResponse.newBuilder() + .setBoolResult(true) + .setType(ValuedResponse.ResponsedType.STOP_AND_RETURN) + .build(); + + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } else { + responseObserver.onError(new Exception("not authorized")); + } + } + } else { + //default + ValuedResponse reply = ValuedResponse.newBuilder() + .setBoolResult(true) + .setType(ValuedResponse.ResponsedType.STOP_AND_RETURN) + .build(); + + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + } + + @Override + public void onClientSubscribe(ClientSubscribeRequest request, StreamObserver responseObserver) { + DEBUG("onClientSubscribe", request); + EmptySuccess reply = EmptySuccess.newBuilder().build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + + @Override + public void onClientUnsubscribe(ClientUnsubscribeRequest request, StreamObserver responseObserver) { + DEBUG("onClientUnsubscribe", request); + EmptySuccess reply = EmptySuccess.newBuilder().build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + + @Override + public void onSessionCreated(SessionCreatedRequest request, StreamObserver responseObserver) { + DEBUG("onSessionCreated", request); + EmptySuccess reply = EmptySuccess.newBuilder().build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + + @Override + public void onSessionSubscribed(SessionSubscribedRequest request, StreamObserver responseObserver) { + DEBUG("onSessionSubscribed", request); + EmptySuccess reply = EmptySuccess.newBuilder().build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + + @Override + public void onSessionUnsubscribed(SessionUnsubscribedRequest request, StreamObserver responseObserver) { + DEBUG("onSessionUnsubscribed", request); + EmptySuccess reply = EmptySuccess.newBuilder().build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + + @Override + public void onSessionResumed(SessionResumedRequest request, StreamObserver responseObserver) { + DEBUG("onSessionResumed", request); + EmptySuccess reply = EmptySuccess.newBuilder().build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + + @Override + public void onSessionDiscarded(SessionDiscardedRequest request, StreamObserver responseObserver) { + DEBUG("onSessionDdiscarded", request); + EmptySuccess reply = EmptySuccess.newBuilder().build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + + @Override + public void onSessionTakeovered(SessionTakeoveredRequest request, StreamObserver responseObserver) { + DEBUG("onSessionTakeovered", request); + EmptySuccess reply = EmptySuccess.newBuilder().build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + + @Override + public void onSessionTerminated(SessionTerminatedRequest request, StreamObserver responseObserver) { + DEBUG("onSessionTerminated", request); + EmptySuccess reply = EmptySuccess.newBuilder().build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + + @Override + public void onMessagePublish(MessagePublishRequest request, StreamObserver responseObserver) { + DEBUG("onMessagePublish", request); + + ByteString bstr = ByteString.copyFromUtf8("hardcode payload by exhook-svr-java :)"); + + Message nmsg = Message.newBuilder() + .setId (request.getMessage().getId()) + .setNode (request.getMessage().getNode()) + .setFrom (request.getMessage().getFrom()) + .setTopic (request.getMessage().getTopic()) + .setPayload(((GeneratedMessageV3) request).toByteString()).build(); + + + ValuedResponse reply = ValuedResponse.newBuilder() + .setType(ValuedResponse.ResponsedType.STOP_AND_RETURN) + .setMessage(nmsg).build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + +// case2: stop publish the 't/d' messages +// @Override +// public void onMessagePublish(MessagePublishRequest request, StreamObserver responseObserver) { +// DEBUG("onMessagePublish", request); +// +// Message nmsg = request.getMessage(); +// if ("t/d".equals(nmsg.getTopic())) { +// ByteString bstr = ByteString.copyFromUtf8(""); +// nmsg = Message.newBuilder() +// .setId (request.getMessage().getId()) +// .setNode (request.getMessage().getNode()) +// .setFrom (request.getMessage().getFrom()) +// .setTopic (request.getMessage().getTopic()) +// .setPayload(bstr) +// .putHeaders("allow_publish", "false").build(); +// } +// +// ValuedResponse reply = ValuedResponse.newBuilder() +// .setType(ValuedResponse.ResponsedType.STOP_AND_RETURN) +// .setMessage(nmsg).build(); +// responseObserver.onNext(reply); +// responseObserver.onCompleted(); +// } + + @Override + public void onMessageDelivered(MessageDeliveredRequest request, StreamObserver responseObserver) { + DEBUG("onMessageDelivered", request); + EmptySuccess reply = EmptySuccess.newBuilder().build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + + @Override + public void onMessageAcked(MessageAckedRequest request, StreamObserver responseObserver) { + DEBUG("onMessageAcked", request); + EmptySuccess reply = EmptySuccess.newBuilder().build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + + @Override + public void onMessageDropped(MessageDroppedRequest request, StreamObserver responseObserver) { + DEBUG("onMessageDropped", request); + EmptySuccess reply = EmptySuccess.newBuilder().build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + + + } + + +} diff --git a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/HandlerConstants.java b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/HandlerConstants.java new file mode 100644 index 0000000000..aa7b09623b --- /dev/null +++ b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/HandlerConstants.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2023, Entgra (Pvt) Ltd. (http://www.entgra.io) All Rights Reserved. + * + * Entgra (Pvt) Ltd. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.entgra.device.mgt.plugins.emqx.exhook; + +public class HandlerConstants { + public static final String INTROSPECT_ENDPOINT = "/oauth2/introspect"; + public static final String BASIC = "Basic "; + public static final String EXECUTOR_EXCEPTION_PREFIX = "ExecutorException-"; + public static final String TOKEN_IS_EXPIRED = "ACCESS_TOKEN_IS_EXPIRED"; + public static final String COLON = ":"; + public static final int INTERNAL_ERROR_CODE = 500; +} diff --git a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/HandlerUtil.java b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/HandlerUtil.java new file mode 100644 index 0000000000..b0c3485cb7 --- /dev/null +++ b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/HandlerUtil.java @@ -0,0 +1,280 @@ +/* + * Copyright (c) 2023, Entgra (Pvt) Ltd. (http://www.entgra.io) All Rights Reserved. + * + * Entgra (Pvt) Ltd. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.entgra.device.mgt.plugins.emqx.exhook; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.client.methods.HttpRequestBase; +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; +import java.io.*; +import java.security.*; +import java.security.cert.CertificateException; + +public class HandlerUtil { + + private static KeyStore keyStore; + private static KeyStore trustStore; + private static char[] keyStorePassword; + private static SSLContext sslContext; + + private static final String KEY_STORE_TYPE = "JKS"; + /** + * Default truststore type of the client + */ + private static final String TRUST_STORE_TYPE = "JKS"; + + private static final String KEY_MANAGER_TYPE = "SunX509"; //Default Key Manager Type + /** + * Default trustmanager type of the client + */ + private static final String TRUST_MANAGER_TYPE = "SunX509"; //Default Trust Manager Type + + private static final String SSLV3 = "SSLv3"; + + private static final Log log = LogFactory.getLog(HandlerUtil.class); + + /*** + * + * @param httpRequest - httpMethod e.g:- HttpPost, HttpGet + * @return response as string + * @throws IOException IO exception returns if error occurs when executing the httpMethod + */ + public static ProxyResponse execute(HttpRequestBase httpRequest) throws IOException { + try (CloseableHttpClient client = getHttpClient()) { + HttpResponse response = client.execute(httpRequest); + ProxyResponse proxyResponse = new ProxyResponse(); + + if (response == null) { + log.error("Received null response for http request : " + httpRequest.getMethod() + " " + httpRequest + .getURI().toString()); + proxyResponse.setCode(HandlerConstants.INTERNAL_ERROR_CODE); + proxyResponse.setStatus(ProxyResponse.Status.ERROR); + proxyResponse.setExecutorResponse(HandlerConstants.EXECUTOR_EXCEPTION_PREFIX + getStatusKey( + HandlerConstants.INTERNAL_ERROR_CODE)); + return proxyResponse; + } else { + int statusCode = response.getStatusLine().getStatusCode(); + String jsonString = getResponseString(response); + if (statusCode == HttpStatus.SC_OK || statusCode == HttpStatus.SC_CREATED) { + proxyResponse.setCode(statusCode); + proxyResponse.setData(jsonString); + proxyResponse.setStatus(ProxyResponse.Status.SUCCESS); + proxyResponse.setExecutorResponse("SUCCESS"); + proxyResponse.setHeaders(response.getAllHeaders()); + return proxyResponse; + } else if (statusCode == HttpStatus.SC_UNAUTHORIZED) { + if (isTokenExpired(jsonString)) { + proxyResponse.setCode(statusCode); + proxyResponse.setStatus(ProxyResponse.Status.ERROR); + proxyResponse.setExecutorResponse(HandlerConstants.TOKEN_IS_EXPIRED); + } else { + log.error( + "Received " + statusCode + " response for http request : " + httpRequest.getMethod() + + " " + httpRequest.getURI().toString() + ". Error message: " + jsonString); + proxyResponse.setCode(statusCode); + proxyResponse.setData(jsonString); + proxyResponse.setStatus(ProxyResponse.Status.ERROR); + proxyResponse.setExecutorResponse( + HandlerConstants.EXECUTOR_EXCEPTION_PREFIX + getStatusKey(statusCode)); + } + return proxyResponse; + } + log.error("Received " + statusCode + + " response for http request : " + httpRequest.getMethod() + " " + httpRequest.getURI() + .toString() + ". Error message: " + jsonString); + proxyResponse.setCode(statusCode); + proxyResponse.setData(jsonString); + proxyResponse.setStatus(ProxyResponse.Status.ERROR); + proxyResponse + .setExecutorResponse(HandlerConstants.EXECUTOR_EXCEPTION_PREFIX + getStatusKey(statusCode)); + return proxyResponse; + } + } + } + + public static boolean isTokenExpired(String jsonBody) { + return jsonBody.contains("Access token expired") || jsonBody + .contains("Invalid input. Access token validation failed"); + } + + /*** + * + * @param statusCode Provide status code, e.g:- 400, 401, 500 etc + * @return relative status code key for given status code. + */ + public static String getStatusKey(int statusCode) { + String statusCodeKey; + + switch (statusCode) { + case HttpStatus.SC_INTERNAL_SERVER_ERROR: + statusCodeKey = "internalServerError"; + break; + case HttpStatus.SC_BAD_REQUEST: + statusCodeKey = "badRequest"; + break; + case HttpStatus.SC_UNAUTHORIZED: + statusCodeKey = "unauthorized"; + break; + case HttpStatus.SC_FORBIDDEN: + statusCodeKey = "forbidden"; + break; + case HttpStatus.SC_NOT_FOUND: + statusCodeKey = "notFound"; + break; + case HttpStatus.SC_METHOD_NOT_ALLOWED: + statusCodeKey = "methodNotAllowed"; + break; + case HttpStatus.SC_NOT_ACCEPTABLE: + statusCodeKey = "notAcceptable"; + break; + case HttpStatus.SC_UNSUPPORTED_MEDIA_TYPE: + statusCodeKey = "unsupportedMediaType"; + break; + default: + statusCodeKey = "defaultPage"; + break; + } + return statusCodeKey; + } + + /** + * Retrieve Http client based on hostname verification. + * + * @return {@link CloseableHttpClient} http client + */ + public static CloseableHttpClient getHttpClient() { + + boolean isIgnoreHostnameVerification = Boolean.parseBoolean(System. + getProperty("org.wso2.ignoreHostnameVerification")); + if (isIgnoreHostnameVerification) { + return HttpClients.custom().setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE).build(); + } else { + return HttpClients.createDefault(); + } + +// String keyStorePassword = "wso2carbon"; +// String trustStorePassword = "wso2carbon"; +// String keyStoreLocation = "/home/amalka/entgra/source/product-switchgear/switchgear-plugin/integration-test/distro-prep/target/entgra-iot-pro-switchgear-repacked-1.0.0-SNAPSHOT/repository/resources/security/wso2carbon.jks"; +// String trustStoreLocation = "/home/amalka/entgra/source/product-switchgear/switchgear-plugin/integration-test/distro-prep/target/entgra-iot-pro-switchgear-repacked-1.0.0-SNAPSHOT/repository/resources/security/client-truststore.jks"; +// +// //Call to load the keystore. +// try { +// loadKeyStore(keyStoreLocation, keyStorePassword); +// } catch (IOException e) { +// throw new RuntimeException(e); +// } catch (CertificateException e) { +// throw new RuntimeException(e); +// } catch (NoSuchAlgorithmException e) { +// throw new RuntimeException(e); +// } catch (KeyStoreException e) { +// throw new RuntimeException(e); +// } +// //Call to load the TrustStore. +// try { +// loadTrustStore(trustStoreLocation, trustStorePassword); +// } catch (KeyStoreException e) { +// throw new RuntimeException(e); +// } catch (IOException e) { +// throw new RuntimeException(e); +// } catch (CertificateException e) { +// throw new RuntimeException(e); +// } catch (NoSuchAlgorithmException e) { +// throw new RuntimeException(e); +// } +// //Create the SSL context with the loaded TrustStore/keystore. +// try { +// initSSLConnection(); +// } catch (NoSuchAlgorithmException e) { +// throw new RuntimeException(e); +// } catch (UnrecoverableKeyException e) { +// throw new RuntimeException(e); +// } catch (KeyStoreException e) { +// throw new RuntimeException(e); +// } catch (KeyManagementException e) { +// throw new RuntimeException(e); +// } +// +// return HttpClients.createDefault(); + + } + + private static void loadKeyStore(String keyStorePath, String ksPassword) + throws IOException, CertificateException, NoSuchAlgorithmException, KeyStoreException { + InputStream fis = null; + try { + keyStorePassword = ksPassword.toCharArray(); + keyStore = KeyStore.getInstance(KEY_STORE_TYPE); + fis = new FileInputStream(keyStorePath); + keyStore.load(fis, keyStorePassword); + + } finally { + if (fis != null) { + fis.close(); + } + } + } + + private static void loadTrustStore(String trustStorePath, String tsPassword) + throws KeyStoreException, IOException, CertificateException, NoSuchAlgorithmException { + + InputStream fis = null; + try { + trustStore = KeyStore.getInstance(TRUST_STORE_TYPE); + fis = new FileInputStream(trustStorePath); + trustStore.load(fis, tsPassword.toCharArray()); + } finally { + if (fis != null) { + fis.close(); + } + } + } + + private static void initSSLConnection() throws NoSuchAlgorithmException, UnrecoverableKeyException, + KeyStoreException, KeyManagementException { + KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KEY_MANAGER_TYPE); + keyManagerFactory.init(keyStore, keyStorePassword); + TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TRUST_MANAGER_TYPE); + trustManagerFactory.init(trustStore); + + // Create and initialize SSLContext for HTTPS communication + sslContext = SSLContext.getInstance(SSLV3); + sslContext.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), null); + SSLContext.setDefault(sslContext); + } + + public static String getResponseString(HttpResponse response) throws IOException { + try (BufferedReader rd = new BufferedReader(new InputStreamReader(response.getEntity().getContent()))) { + StringBuilder responseBuilder = new StringBuilder(); + String line; + while ((line = rd.readLine()) != null) { + responseBuilder.append(line); + } + return responseBuilder.toString(); + } + } + +} diff --git a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/ProxyResponse.java b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/ProxyResponse.java new file mode 100644 index 0000000000..1eadf8f200 --- /dev/null +++ b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/ProxyResponse.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2023, Entgra (Pvt) Ltd. (http://www.entgra.io) All Rights Reserved. + * + * Entgra (Pvt) Ltd. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.entgra.device.mgt.plugins.emqx.exhook; + +import org.apache.http.Header; + +public class ProxyResponse { + + public static class Status { + public static int SUCCESS = 1; + public static int ERROR = 0; + } + + private int code; + private String data; + private String executorResponse; + private int status; + private Header[] headers; + + public int getCode() { return code; } + + public void setCode(int code) { this.code = code; } + + public String getData() { return data; } + + public void setData(String data) { this.data = data; } + + public String getExecutorResponse() { return executorResponse; } + + public void setExecutorResponse(String executorResponse) { this.executorResponse = executorResponse; } + + public int getStatus() { + return status; + } + + public void setStatus(int status) { + this.status = status; + } + + public Header[] getHeaders() { + return headers; + } + + public void setHeaders(Header[] headers) { + this.headers = headers; + } + +} diff --git a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/proto/exhook.proto b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/proto/exhook.proto new file mode 100644 index 0000000000..0e7cab7644 --- /dev/null +++ b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/proto/exhook.proto @@ -0,0 +1,434 @@ +// +// Copyright (c) 2023, Entgra (Pvt) Ltd. (http://www.entgra.io) All Rights Reserved. +// +// Entgra (Pvt) Ltd. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +syntax = "proto3"; + +option csharp_namespace = "Emqx.Exhook.V1"; +option go_package = "emqx.io/grpc/exhook"; +option java_multiple_files = true; +option java_package = "io.entgra.device.mgt.plugins.emqx.exhook"; +option java_outer_classname = "EmqxExHookProto"; + +package emqx.exhook.v1; + +service HookProvider { + + rpc OnProviderLoaded(ProviderLoadedRequest) returns (LoadedResponse) {}; + + rpc OnProviderUnloaded(ProviderUnloadedRequest) returns (EmptySuccess) {}; + + rpc OnClientConnect(ClientConnectRequest) returns (EmptySuccess) {}; + + rpc OnClientConnack(ClientConnackRequest) returns (EmptySuccess) {}; + + rpc OnClientConnected(ClientConnectedRequest) returns (EmptySuccess) {}; + + rpc OnClientDisconnected(ClientDisconnectedRequest) returns (EmptySuccess) {}; + + rpc OnClientAuthenticate(ClientAuthenticateRequest) returns (ValuedResponse) {}; + + rpc OnClientCheckAcl(ClientCheckAclRequest) returns (ValuedResponse) {}; + + rpc OnClientSubscribe(ClientSubscribeRequest) returns (EmptySuccess) {}; + + rpc OnClientUnsubscribe(ClientUnsubscribeRequest) returns (EmptySuccess) {}; + + rpc OnSessionCreated(SessionCreatedRequest) returns (EmptySuccess) {}; + + rpc OnSessionSubscribed(SessionSubscribedRequest) returns (EmptySuccess) {}; + + rpc OnSessionUnsubscribed(SessionUnsubscribedRequest) returns (EmptySuccess) {}; + + rpc OnSessionResumed(SessionResumedRequest) returns (EmptySuccess) {}; + + rpc OnSessionDiscarded(SessionDiscardedRequest) returns (EmptySuccess) {}; + + rpc OnSessionTakeovered(SessionTakeoveredRequest) returns (EmptySuccess) {}; + + rpc OnSessionTerminated(SessionTerminatedRequest) returns (EmptySuccess) {}; + + rpc OnMessagePublish(MessagePublishRequest) returns (ValuedResponse) {}; + + rpc OnMessageDelivered(MessageDeliveredRequest) returns (EmptySuccess) {}; + + rpc OnMessageDropped(MessageDroppedRequest) returns (EmptySuccess) {}; + + rpc OnMessageAcked(MessageAckedRequest) returns (EmptySuccess) {}; +} + +//------------------------------------------------------------------------------ +// Request & Response +//------------------------------------------------------------------------------ + +message ProviderLoadedRequest { + + BrokerInfo broker = 1; +} + +message LoadedResponse { + + repeated HookSpec hooks = 1; +} + +message ProviderUnloadedRequest { } + +message ClientConnectRequest { + + ConnInfo conninfo = 1; + + // MQTT CONNECT packet's properties (MQTT v5.0) + // + // It should be empty on MQTT v3.1.1/v3.1 or others protocol + repeated Property props = 2; +} + +message ClientConnackRequest { + + ConnInfo conninfo = 1; + + string result_code = 2; + + repeated Property props = 3; +} + +message ClientConnectedRequest { + + ClientInfo clientinfo = 1; +} + +message ClientDisconnectedRequest { + + ClientInfo clientinfo = 1; + + string reason = 2; +} + +message ClientAuthenticateRequest { + + ClientInfo clientinfo = 1; + + bool result = 2; +} + +message ClientCheckAclRequest { + + ClientInfo clientinfo = 1; + + enum AclReqType { + + PUBLISH = 0; + + SUBSCRIBE = 1; + } + + AclReqType type = 2; + + string topic = 3; + + bool result = 4; +} + +message ClientSubscribeRequest { + + ClientInfo clientinfo = 1; + + repeated Property props = 2; + + repeated TopicFilter topic_filters = 3; +} + +message ClientUnsubscribeRequest { + + ClientInfo clientinfo = 1; + + repeated Property props = 2; + + repeated TopicFilter topic_filters = 3; +} + +message SessionCreatedRequest { + + ClientInfo clientinfo = 1; +} + +message SessionSubscribedRequest { + + ClientInfo clientinfo = 1; + + string topic = 2; + + SubOpts subopts = 3; +} + +message SessionUnsubscribedRequest { + + ClientInfo clientinfo = 1; + + string topic = 2; +} + +message SessionResumedRequest { + + ClientInfo clientinfo = 1; +} + +message SessionDiscardedRequest { + + ClientInfo clientinfo = 1; +} + +message SessionTakeoveredRequest { + + ClientInfo clientinfo = 1; +} + +message SessionTerminatedRequest { + + ClientInfo clientinfo = 1; + + string reason = 2; +} + +message MessagePublishRequest { + + Message message = 1; +} + +message MessageDeliveredRequest { + + ClientInfo clientinfo = 1; + + Message message = 2; +} + +message MessageDroppedRequest { + + Message message = 1; + + string reason = 2; +} + +message MessageAckedRequest { + + ClientInfo clientinfo = 1; + + Message message = 2; +} + +//------------------------------------------------------------------------------ +// Basic data types +//------------------------------------------------------------------------------ + +message EmptySuccess { } + +message ValuedResponse { + + // The responsed value type + // - contiune: Use the responsed value and execute the next hook + // - ignore: Ignore the responsed value + // - stop_and_return: Use the responsed value and stop the chain executing + enum ResponsedType { + + CONTINUE = 0; + + IGNORE = 1; + + STOP_AND_RETURN = 2; + } + + ResponsedType type = 1; + + oneof value { + + // Boolean result, used on the 'client.authenticate', 'client.check_acl' hooks + bool bool_result = 3; + + // Message result, used on the 'message.*' hooks + Message message = 4; + } +} + +message BrokerInfo { + + string version = 1; + + string sysdescr = 2; + + string uptime = 3; + + string datetime = 4; +} + +message HookSpec { + + // The registered hooks name + // + // Available value: + // "client.connect", "client.connack" + // "client.connected", "client.disconnected" + // "client.authenticate", "client.check_acl" + // "client.subscribe", "client.unsubscribe" + // + // "session.created", "session.subscribed" + // "session.unsubscribed", "session.resumed" + // "session.discarded", "session.takeovered" + // "session.terminated" + // + // "message.publish", "message.delivered" + // "message.acked", "message.dropped" + string name = 1; + + // The topic filters for message hooks + repeated string topics = 2; +} + +message ConnInfo { + + string node = 1; + + string clientid = 2; + + string username = 3; + + string peerhost = 4; + + uint32 sockport = 5; + + string proto_name = 6; + + string proto_ver = 7; + + uint32 keepalive = 8; +} + +message ClientInfo { + + string node = 1; + + string clientid = 2; + + string username = 3; + + string password = 4; + + string peerhost = 5; + + uint32 sockport = 6; + + string protocol = 7; + + string mountpoint = 8; + + bool is_superuser = 9; + + bool anonymous = 10; + + // common name of client TLS cert + string cn = 11; + + // subject of client TLS cert + string dn = 12; +} + +message Message { + + string node = 1; + + string id = 2; + + uint32 qos = 3; + + string from = 4; + + string topic = 5; + + bytes payload = 6; + + uint64 timestamp = 7; + + // The key of header can be: + // - username: + // * Readonly + // * The username of sender client + // * Value type: utf8 string + // - protocol: + // * Readonly + // * The protocol name of sender client + // * Value type: string enum with "mqtt", "mqtt-sn", ... + // - peerhost: + // * Readonly + // * The peerhost of sender client + // * Value type: ip address string + // - allow_publish: + // * Writable + // * Whether to allow the message to be published by emqx + // * Value type: string enum with "true", "false", default is "true" + // + // Notes: All header may be missing, which means that the message does not + // carry these headers. We can guarantee that clients coming from MQTT, + // MQTT-SN, CoAP, LwM2M and other natively supported protocol clients will + // carry these headers, but there is no guarantee that messages published + // by other means will do, e.g. messages published by HTTP-API + map headers = 8; +} + +message Property { + + string name = 1; + + string value = 2; +} + +message TopicFilter { + + string name = 1; + + uint32 qos = 2; +} + +message SubOpts { + + // The QoS level + uint32 qos = 1; + + // The group name for shared subscription + string share = 2; + + // The Retain Handling option (MQTT v5.0) + // + // 0 = Send retained messages at the time of the subscribe + // 1 = Send retained messages at subscribe only if the subscription does + // not currently exist + // 2 = Do not send retained messages at the time of the subscribe + uint32 rh = 3; + + // The Retain as Published option (MQTT v5.0) + // + // If 1, Application Messages forwarded using this subscription keep the + // RETAIN flag they were published with. + // If 0, Application Messages forwarded using this subscription have the + // RETAIN flag set to 0. + // Retained messages sent when the subscription is established have the RETAIN flag set to 1. + uint32 rap = 4; + + // The No Local option (MQTT v5.0) + // + // If the value is 1, Application Messages MUST NOT be forwarded to a + // connection with a ClientID equal to the ClientID of the publishing + uint32 nl = 5; +} diff --git a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/pom.xml b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/pom.xml new file mode 100644 index 0000000000..958aaf3fda --- /dev/null +++ b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/pom.xml @@ -0,0 +1,111 @@ + + + + + + org.wso2.carbon.devicemgt-plugins + emqx-extensions + 6.0.13-SNAPSHOT + ../pom.xml + + + 4.0.0 + io.entgra.device.mgt.plugins.emqx.initializer + bundle + EMQX Extensions - Extension Hook Initializer + + + + org.wso2.carbon.devicemgt-plugins + io.entgra.device.mgt.plugins.emqx.exhook + jar-with-dependencies + + + org.wso2.carbon + org.wso2.carbon.core + provided + + + org.ops4j.pax.logging + pax-logging-api + provided + + + + + + org.apache.maven.wagon + wagon-ssh + 2.1 + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.2 + + ${wso2.maven.compiler.source} + ${wso2.maven.compiler.target} + + + + org.apache.felix + maven-scr-plugin + 1.7.2 + + + generate-scr-scrdescriptor + + scr + + + + + + org.apache.felix + maven-bundle-plugin + 1.4.0 + true + + + ${project.artifactId} + ${project.artifactId} + ${project.version} + io.entgra.device.mgt.plugins.emqx.initializer + io.entgra.device.mgt.plugins.emqx.initializer.internal + + io.entgra.device.mgt.plugins.emqx.exhook, + io.entgra.device.mgt.plugins.emqx.initializer;resolution:=optional, + org.apache.commons.logging, + org.wso2.carbon.core, + org.osgi.framework.*;version="${imp.package.version.osgi.framework}", + org.osgi.service.*;version="${imp.package.version.osgi.service}", + + + !io.entgra.device.mgt.plugins.emqx.initializer.internal, + io.entgra.device.mgt.plugins.emqx.initializer.* + + + + + + + \ No newline at end of file diff --git a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/EmqxExhookDataHolder.java b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/EmqxExhookDataHolder.java new file mode 100644 index 0000000000..9c3ab4e4f6 --- /dev/null +++ b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/EmqxExhookDataHolder.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2023, Entgra (Pvt) Ltd. (http://www.entgra.io) All Rights Reserved. + * + * Entgra (Pvt) Ltd. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.entgra.device.mgt.plugins.emqx.initializer; + +public class EmqxExhookDataHolder { + + private static final EmqxExhookDataHolder thisInstance = new EmqxExhookDataHolder(); + public static EmqxExhookDataHolder getInstance() { + return thisInstance; + } +} diff --git a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/EmqxExhookServiceComponent.java b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/EmqxExhookServiceComponent.java new file mode 100644 index 0000000000..cff5f07945 --- /dev/null +++ b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/EmqxExhookServiceComponent.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2023, Entgra (Pvt) Ltd. (http://www.entgra.io) All Rights Reserved. + * + * Entgra (Pvt) Ltd. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.entgra.device.mgt.plugins.emqx.initializer; + +import io.entgra.device.mgt.plugins.emqx.initializer.internal.EmqxExhookInitializer; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.osgi.framework.BundleContext; +import org.osgi.service.component.ComponentContext; +import org.wso2.carbon.core.ServerShutdownHandler; +import org.wso2.carbon.core.ServerStartupObserver; + +/** + * @scr.component name="io.entgra.device.mgt.plugins.emqx.initializer.internal.EmqxExhookServiceComponent" + * immediate="true" + */ +public class EmqxExhookServiceComponent { + private static final Log log = LogFactory.getLog(EmqxExhookServiceComponent.class); + + protected void activate(ComponentContext ctx) { + try { + + EmqxExhookInitializer initializer = new EmqxExhookInitializer(); + BundleContext bundleContext = ctx.getBundleContext(); + + bundleContext.registerService(ServerStartupObserver.class.getName(), initializer, null); + bundleContext.registerService(ServerShutdownHandler.class.getName(), initializer, null); + + if (log.isDebugEnabled()) { + log.debug("EmqxExhookServiceComponent has been successfully activated"); + } + } catch (Throwable e) { + log.error("Error occurred while activating EmqxExhookServiceComponent", e); + } + + } + + protected void deactivate(ComponentContext ctx) { + try { + if (log.isDebugEnabled()) { + log.debug("EmqxExhookServiceComponent has been successfully de-activated"); + } + } catch (Throwable e) { + log.error("Error occurred while de-activating EmqxExhookServiceComponent", e); + } + } +} diff --git a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/internal/EmqxExhookInitializer.java b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/internal/EmqxExhookInitializer.java new file mode 100644 index 0000000000..a6c7e5184f --- /dev/null +++ b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/internal/EmqxExhookInitializer.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2023, Entgra (Pvt) Ltd. (http://www.entgra.io) All Rights Reserved. + * + * Entgra (Pvt) Ltd. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.entgra.device.mgt.plugins.emqx.initializer.internal; + + +import io.entgra.device.mgt.plugins.emqx.exhook.ExServer; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.wso2.carbon.core.ServerShutdownHandler; +import org.wso2.carbon.core.ServerStartupObserver; + +import java.io.IOException; + +public class EmqxExhookInitializer implements ServerShutdownHandler, ServerStartupObserver { + ExServer exServer = null; + + private static final Log log = LogFactory.getLog(EmqxExhookInitializer.class); + + @Override + public void completingServerStartup() { + + } + + @Override + public void completedServerStartup() { + log.info("completedServerStartup() "); + Runnable r = new Runnable() { + @Override + public void run() { + exServer = new ExServer(); + try { + exServer.start(); + exServer.blockUntilShutdown(); + } catch (IOException e) { + log.error("Error while starting the EMQX extension server"); + throw new RuntimeException(e); + } catch (InterruptedException e) { + log.error("Error while blocking until shutdown"); + throw new RuntimeException(e); + } + } + }; + + new Thread(r).start(); + } + + @Override + public void invoke() { + try { + exServer.stop(); + } catch (InterruptedException e) { + log.error("Error while stopping the EMQX Extension server"); + + } + } +} diff --git a/components/extensions/emqx-extensions/pom.xml b/components/extensions/emqx-extensions/pom.xml new file mode 100644 index 0000000000..35cb7afab6 --- /dev/null +++ b/components/extensions/emqx-extensions/pom.xml @@ -0,0 +1,39 @@ + + + + + + + org.wso2.carbon.devicemgt-plugins + extensions + 6.0.13-SNAPSHOT + ../pom.xml + + + 4.0.0 + emqx-extensions + pom + EMQX Extensions + + + io.entgra.device.mgt.plugins.emqx.exhook + io.entgra.device.mgt.plugins.emqx.initializer + + + diff --git a/components/extensions/pom.xml b/components/extensions/pom.xml index 86ccec15bd..0dd38cbe84 100644 --- a/components/extensions/pom.xml +++ b/components/extensions/pom.xml @@ -36,6 +36,7 @@ cdmf-transport-adapters pull-notification-listeners + emqx-extensions diff --git a/pom.xml b/pom.xml index b826ea75f8..264626acbe 100644 --- a/pom.xml +++ b/pom.xml @@ -1013,6 +1013,24 @@ org.wso2.carbon.andes.extensions.device.mgt.api ${carbon.devicemgt.plugins.version} + + + + org.wso2.carbon.devicemgt-plugins + io.entgra.device.mgt.plugins.emqx.exhook + jar-with-dependencies + ${carbon.devicemgt.plugins.version} + + + org.wso2.carbon.devicemgt-plugins + io.entgra.device.mgt.plugins.emqx.initializer + ${carbon.devicemgt.plugins.version} + + + org.ops4j.pax.logging + pax-logging-api + ${pax.logging.api.version} + commons-lang commons-lang @@ -1117,6 +1135,35 @@ google-api-services-androidenterprise v1-rev214-1.25.0 + + + + io.grpc + grpc-netty-shaded + ${grpc.version} + + + io.grpc + grpc-protobuf + ${grpc.version} + + + io.grpc + grpc-stub + ${grpc.version} + + + org.apache.tomcat + annotations-api + ${tomcat-annotations-api} + provided + + + commons-lang.wso2 + commons-lang + ${commons-lang-wso2} + provided + @@ -1140,7 +1187,7 @@ 4.6.2 [4.5.0, 5.0.0) 4.4.9 - + 1.11.2 5.1.2 @@ -1307,6 +1354,10 @@ [1.6.0, 2.0.0) [1.2.0,1.3.0) + + 1.51.0 + 6.0.53 + 2.6.0.wso2v1 @@ -1404,6 +1455,7 @@ + org.apache.maven.plugins maven-assembly-plugin 2.2-beta-2 From f451529e6d6de58bc74456d223026e826a6f6815 Mon Sep 17 00:00:00 2001 From: Amalka Subasinghe Date: Sat, 18 Mar 2023 01:49:28 +0530 Subject: [PATCH 2/4] fixing mqtt lister with content transformation and validation --- .../transformer/MQTTContentTransformer.java | 13 +- .../validator/MQTTContentValidator.java | 9 +- .../pom.xml | 9 +- .../mqtt/util/MQTTAdapterListener.java | 133 ++++++------------ .../mqtt/util/MQTTEventAdapterConstants.java | 1 + .../adapter/mqtt/util/PropertyUtils.java | 10 +- pom.xml | 7 +- 7 files changed, 86 insertions(+), 96 deletions(-) diff --git a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/transformer/MQTTContentTransformer.java b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/transformer/MQTTContentTransformer.java index fba9340096..66f2b4ee8d 100644 --- a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/transformer/MQTTContentTransformer.java +++ b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/transformer/MQTTContentTransformer.java @@ -35,6 +35,7 @@ import java.util.Map; public class MQTTContentTransformer implements ContentTransformer { private static final String MQTT_CONTENT_TRANSFORMER = "device-meta-transformer"; private static final String TOPIC = "topic"; + private static final String DEVICE_ID_INDEX = "deviceIdIndex"; private static String JSON_ARRAY_START_CHAR = "["; private static final Log log = LogFactory.getLog(MQTTContentTransformer.class); @@ -47,15 +48,19 @@ public class MQTTContentTransformer implements ContentTransformer { @Override public Object transform(Object messagePayload, Map dynamicProperties) { String topic = (String) dynamicProperties.get(TOPIC); + if (!dynamicProperties.containsKey(DEVICE_ID_INDEX)) { + log.error("device id not found in topic "); + return false; + } + int deviceIdIndex = (int)dynamicProperties.get(DEVICE_ID_INDEX); String topics[] = topic.split("/"); - String deviceId = topics[2]; - String deviceType = topics[1]; + String deviceId = topics[deviceIdIndex]; String message = (String) messagePayload; try { if (message.startsWith(JSON_ARRAY_START_CHAR)) { - return processMultipleEvents(message, deviceId, deviceType); + return processMultipleEvents(message, deviceId, deviceId); } else { - return processSingleEvent(message, deviceId, deviceType); + return processSingleEvent(message, deviceId, deviceId); } } catch (ParseException e) { log.error("Invalid input " + message, e); diff --git a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/validator/MQTTContentValidator.java b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/validator/MQTTContentValidator.java index 20261ab6d9..71d68d6eaa 100644 --- a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/validator/MQTTContentValidator.java +++ b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.extension/src/main/java/org/wso2/carbon/device/mgt/input/adapter/extension/validator/MQTTContentValidator.java @@ -36,6 +36,7 @@ public class MQTTContentValidator implements ContentValidator { private static final String DEVICE_ID_JSON_PATH = "event.metaData.deviceId"; private static final String DEVICE_TYPE_JSON_PATH = "event.metaData.deviceId"; private static final String TOPIC = "topic"; + private static final String DEVICE_ID_INDEX = "deviceIdIndex"; private static final int DEVICE_ID_TOPIC_HIERARCHY_INDEX = 2; @Override @@ -46,9 +47,13 @@ public class MQTTContentValidator implements ContentValidator { @Override public ContentInfo validate(Object msgPayload, Map dynamicParams) { String topic = (String) dynamicParams.get(TOPIC); + if (!dynamicParams.containsKey(DEVICE_ID_INDEX)) { + log.error("device id not found in topic "); + return null; + } + int deviceIdIndex = (int)dynamicParams.get(DEVICE_ID_INDEX); String topics[] = topic.split("/"); - int deviceIdInTopicHierarchyLevelIndex = DEVICE_ID_TOPIC_HIERARCHY_INDEX; - String deviceIdFromTopic = topics[deviceIdInTopicHierarchyLevelIndex]; + String deviceIdFromTopic = topics[deviceIdIndex]; boolean status; String message = (String) msgPayload; if (message.startsWith(JSON_ARRAY_START_CHAR)) { diff --git a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/pom.xml b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/pom.xml index ea044a7234..e7ac5c9337 100644 --- a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/pom.xml +++ b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/pom.xml @@ -80,6 +80,10 @@ org.wso2.carbon.devicemgt org.wso2.carbon.identity.jwt.client.extension + + org.wso2.carbon.devicemgt + org.wso2.carbon.apimgt.keymgt.extension + org.wso2.carbon org.wso2.carbon.user.api @@ -151,7 +155,10 @@ org.wso2.carbon.utils.multitenancy, org.apache.axis2.context, org.wso2.carbon.core.multitenancy.utils, - org.wso2.carbon.utils + org.wso2.carbon.utils, + org.wso2.carbon.apimgt.keymgt.extension;version="[5.0,6)", + org.wso2.carbon.apimgt.keymgt.extension.exception;version="[5.0,6)", + org.wso2.carbon.apimgt.keymgt.extension.service;version="[5.0,6)" diff --git a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTAdapterListener.java b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTAdapterListener.java index e41cfea50d..395f07aecc 100644 --- a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTAdapterListener.java +++ b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTAdapterListener.java @@ -17,22 +17,18 @@ */ package org.wso2.carbon.device.mgt.input.adapter.mqtt.util; -import org.apache.axis2.context.ConfigurationContext; import org.apache.commons.codec.binary.Base64; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.http.HttpResponse; -import org.apache.http.client.HttpClient; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.conn.HttpHostConnectException; -import org.apache.http.entity.ContentType; -import org.apache.http.entity.StringEntity; -import org.apache.http.message.BasicHeader; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; -import org.json.simple.JSONObject; -import org.json.simple.parser.JSONParser; -import org.json.simple.parser.ParseException; +import org.wso2.carbon.apimgt.keymgt.extension.DCRResponse; +import org.wso2.carbon.apimgt.keymgt.extension.TokenRequest; +import org.wso2.carbon.apimgt.keymgt.extension.TokenResponse; +import org.wso2.carbon.apimgt.keymgt.extension.exception.BadRequestException; +import org.wso2.carbon.apimgt.keymgt.extension.exception.KeyMgtException; +import org.wso2.carbon.apimgt.keymgt.extension.service.KeyMgtService; +import org.wso2.carbon.apimgt.keymgt.extension.service.KeyMgtServiceImpl; import org.wso2.carbon.context.PrivilegedCarbonContext; import org.wso2.carbon.core.ServerStatus; import org.wso2.carbon.core.multitenancy.utils.TenantAxisUtils; @@ -43,18 +39,10 @@ import org.wso2.carbon.device.mgt.input.adapter.mqtt.internal.InputAdapterServic import org.wso2.carbon.event.input.adapter.core.InputEventAdapterConfiguration; import org.wso2.carbon.event.input.adapter.core.InputEventAdapterListener; import org.wso2.carbon.event.input.adapter.core.exception.InputEventAdapterRuntimeException; -import org.wso2.carbon.identity.jwt.client.extension.dto.AccessTokenInfo; import org.wso2.carbon.identity.jwt.client.extension.exception.JWTClientException; -import org.wso2.carbon.identity.jwt.client.extension.service.JWTClientManagerService; import org.wso2.carbon.user.api.UserStoreException; import org.wso2.carbon.utils.multitenancy.MultitenantConstants; -import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URL; -import java.security.KeyManagementException; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; import java.util.HashMap; import java.util.Map; @@ -68,6 +56,7 @@ public class MQTTAdapterListener implements MqttCallback, Runnable { private MQTTBrokerConnectionConfiguration mqttBrokerConnectionConfiguration; private String topic; + private String topicStructure; private String tenantDomain; private volatile boolean connectionSucceeded = false; private ContentValidator contentValidator; @@ -88,13 +77,10 @@ public class MQTTAdapterListener implements MqttCallback, Runnable { this.mqttBrokerConnectionConfiguration = mqttBrokerConnectionConfiguration; this.cleanSession = mqttBrokerConnectionConfiguration.isCleanSession(); int keepAlive = mqttBrokerConnectionConfiguration.getKeepAlive(); - this.topic = PropertyUtils.replaceTenantDomainProperty(topic); + this.topicStructure = new String(topic); + this.topic = PropertyUtils.replacePlaceholders(topic); this.eventAdapterListener = inputEventAdapterListener; - this.tenantDomain = this.topic.split("/")[0]; - //this is to allow server listener from IoT Core to connect. - if (this.tenantDomain.equals("+")) { - this.tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain(); - } + this.tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain(); //SORTING messages until the server fetches them String temp_directory = System.getProperty("java.io.tmpdir"); @@ -146,58 +132,21 @@ public class MQTTAdapterListener implements MqttCallback, Runnable { //getJWT Client Parameters. if (dcrUrlString != null && !dcrUrlString.isEmpty()) { try { - URL dcrUrl = new URL(dcrUrlString); - HttpClient httpClient = MQTTUtil.getHttpClient(dcrUrl.getProtocol()); - HttpPost postMethod = new HttpPost(dcrUrlString); - RegistrationProfile registrationProfile = new RegistrationProfile(); - registrationProfile.setCallbackUrl(MQTTEventAdapterConstants.EMPTY_STRING); - registrationProfile.setGrantType(MQTTEventAdapterConstants.GRANT_TYPE); - registrationProfile.setOwner(username); - registrationProfile.setTokenScope(MQTTEventAdapterConstants.TOKEN_SCOPE); - if (!mqttBrokerConnectionConfiguration.isGlobalCredentailSet()) { - registrationProfile.setClientName(MQTTEventAdapterConstants.APPLICATION_NAME_PREFIX - + mqttBrokerConnectionConfiguration.getAdapterName() + - "_" + tenantDomain); - registrationProfile.setIsSaasApp(false); - } else { - registrationProfile.setClientName(MQTTEventAdapterConstants.APPLICATION_NAME_PREFIX - + mqttBrokerConnectionConfiguration.getAdapterName()); - registrationProfile.setIsSaasApp(true); - } - String jsonString = registrationProfile.toJSON(); - StringEntity requestEntity = new StringEntity(jsonString, ContentType.APPLICATION_JSON); - postMethod.setEntity(requestEntity); - String basicAuth = getBase64Encode(username, password); - postMethod.setHeader(new BasicHeader(MQTTEventAdapterConstants.AUTHORIZATION_HEADER_NAME, - MQTTEventAdapterConstants.AUTHORIZATION_HEADER_VALUE_PREFIX + - basicAuth)); - HttpResponse httpResponse = httpClient.execute(postMethod); - if (httpResponse != null) { - String response = MQTTUtil.getResponseString(httpResponse); - try { - if (response != null) { - JSONParser jsonParser = new JSONParser(); - JSONObject jsonPayload = (JSONObject) jsonParser.parse(response); - String clientId = (String) jsonPayload.get(MQTTEventAdapterConstants.CLIENT_ID); - String clientSecret = (String) jsonPayload.get(MQTTEventAdapterConstants.CLIENT_SECRET); - connectionOptions.setUserName(getToken(clientId, clientSecret)); - } - } catch (ParseException e) { - String msg = "error occurred while parsing generating token for the adapter"; - log.error(msg, e); - } - } - } catch (HttpHostConnectException e) { - log.error("Keymanager is unreachable, Waiting...."); - return false; - } catch (MalformedURLException e) { - log.error("Invalid dcrUrl : " + dcrUrlString); - return false; + KeyMgtService keyMgtService = new KeyMgtServiceImpl(); + String applicationName = MQTTEventAdapterConstants.APPLICATION_NAME_PREFIX + + mqttBrokerConnectionConfiguration.getAdapterName(); + DCRResponse dcrResponse = keyMgtService.dynamicClientRegistration(applicationName, username, + "client_credentials", null, new String[]{"device_management"}, false, Integer.MAX_VALUE); + String accessToken = getToken(dcrResponse.getClientId(), dcrResponse.getClientSecret()); + connectionOptions.setUserName(accessToken.substring(0, 18)); + connectionOptions.setPassword(accessToken.substring(19).toCharArray()); + + } catch (JWTClientException | UserStoreException e) { - log.error("Failed to create an oauth token with jwt grant type.", e); + log.error("Failed to create an oauth token with client_credentials grant type.", e); return false; - } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException | IOException e) { - log.error("Failed to create a http connection.", e); + } catch (KeyMgtException e) { + log.error("Failed to create an application.", e); return false; } } @@ -249,7 +198,8 @@ public class MQTTAdapterListener implements MqttCallback, Runnable { @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { try { - String msgText = mqttMessage.toString(); + String mqttMsgString = mqttMessage.toString(); + String msgText = mqttMsgString.substring(mqttMsgString.indexOf("{"), mqttMsgString.indexOf("}") +1); if (log.isDebugEnabled()) { log.debug(msgText); } @@ -267,10 +217,18 @@ public class MQTTAdapterListener implements MqttCallback, Runnable { log.debug("Event received in MQTT Event Adapter - " + msgText); } + if (contentValidator != null && contentTransformer != null) { ContentInfo contentInfo; Map dynamicProperties = new HashMap<>(); dynamicProperties.put(MQTTEventAdapterConstants.TOPIC, topic); + int deviceIdIndex = -1; + for (String topicStructurePart : topicStructure.split("/")) { + deviceIdIndex++; + if (topicStructurePart.contains("+")) { + dynamicProperties.put(MQTTEventAdapterConstants.DEVICE_ID_INDEX, deviceIdIndex); + } + } Object transformedMessage = contentTransformer.transform(msgText, dynamicProperties); contentInfo = contentValidator.validate(transformedMessage, dynamicProperties); if (contentInfo != null && contentInfo.isValidContent()) { @@ -326,21 +284,22 @@ public class MQTTAdapterListener implements MqttCallback, Runnable { PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(tenantDomain, true); try { String scopes = mqttBrokerConnectionConfiguration.getBrokerScopes(); - String username = mqttBrokerConnectionConfiguration.getUsername(); - if (mqttBrokerConnectionConfiguration.isGlobalCredentailSet()) { - username = PrivilegedCarbonContext.getThreadLocalCarbonContext() - .getUserRealm().getRealmConfiguration().getAdminUserName() + "@" + PrivilegedCarbonContext - .getThreadLocalCarbonContext().getTenantDomain(true); - } + scopes += " perm:topic:sub:" + this.topic.replace("/",":"); + scopes += " perm:topic:pub:" + this.topic.replace("/",":"); + + TokenRequest tokenRequest = new TokenRequest(clientId, clientSecret, + null, scopes.toString(), "client_credentials", null, + null, null, null, Integer.MAX_VALUE); + KeyMgtService keyMgtService = new KeyMgtServiceImpl(); + TokenResponse tokenResponse = keyMgtService.generateAccessToken(tokenRequest); - JWTClientManagerService jwtClientManagerService = - InputAdapterServiceDataHolder.getJwtClientManagerService(); - AccessTokenInfo accessTokenInfo = jwtClientManagerService.getJWTClient().getAccessToken( - clientId, clientSecret, username, scopes); - return accessTokenInfo.getAccessToken(); + return tokenResponse.getAccessToken(); + } catch (KeyMgtException | BadRequestException e) { + log.error("Error while generating access token", e); } finally { PrivilegedCarbonContext.endTenantFlow(); } + return null; } private String getBase64Encode(String key, String value) { diff --git a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTEventAdapterConstants.java b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTEventAdapterConstants.java index 7c80d8a1d3..663f25b727 100644 --- a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTEventAdapterConstants.java +++ b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/MQTTEventAdapterConstants.java @@ -64,6 +64,7 @@ public class MQTTEventAdapterConstants { public static final String EMPTY = ""; public static final String MQTT_CONTENT_VALIDATION_DEFAULT_PARAMETERS = ""; public static final String TOPIC = "topic"; + public static final String DEVICE_ID_INDEX = "deviceIdIndex"; public static final String PAYLOAD = "payload"; public static final String AUTHORIZATION_HEADER_NAME = "Authorization"; public static final String AUTHORIZATION_HEADER_VALUE_PREFIX = "Basic "; diff --git a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/PropertyUtils.java b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/PropertyUtils.java index 682a292f7f..1415a16849 100644 --- a/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/PropertyUtils.java +++ b/components/extensions/cdmf-transport-adapters/input/org.wso2.carbon.device.mgt.input.adapter.mqtt/src/main/java/org/wso2/carbon/device/mgt/input/adapter/mqtt/util/PropertyUtils.java @@ -25,7 +25,9 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; class PropertyUtils { - private static final String TENANT_DOMAIN_PROPERTY = "\\$\\{tenant-domain\\}"; + private static final String TENANT_DOMAIN_PROPERTY = "\\$\\{tenantDomain\\}"; + private static final String DEVICE_TYPE_PROPERTY = "\\$\\{deviceType\\}"; + private static final String DEVICE_ID_PROPERTY = "\\$\\{deviceId\\}"; //This method is only used if the mb features are within DAS. static String replaceMqttProperty(String urlWithPlaceholders) throws InputEventAdapterException { @@ -50,4 +52,10 @@ class PropertyUtils { .getThreadLocalCarbonContext().getTenantDomain(true)); return urlWithPlaceholders; } + + public static String replacePlaceholders(String urlWithPlaceholders) { + urlWithPlaceholders = urlWithPlaceholders.replaceAll(TENANT_DOMAIN_PROPERTY, "+") + .replaceAll(DEVICE_TYPE_PROPERTY, "+").replaceAll(DEVICE_ID_PROPERTY, "+"); + return urlWithPlaceholders; + } } diff --git a/pom.xml b/pom.xml index 264626acbe..5ebba9e788 100644 --- a/pom.xml +++ b/pom.xml @@ -305,6 +305,11 @@ org.wso2.carbon.identity.jwt.client.extension ${carbon.devicemgt.version} + + org.wso2.carbon.devicemgt + org.wso2.carbon.apimgt.keymgt.extension + ${carbon.devicemgt.version} + org.wso2.carbon.devicemgt org.wso2.carbon.apimgt.application.extension @@ -1170,7 +1175,7 @@ - 5.0.20 + 5.0.21-SNAPSHOT [5.0.0, 6.0.0) From 8eeefb7c6e97884b0010df3baaef1a69c517beb1 Mon Sep 17 00:00:00 2001 From: Amalka Subasinghe Date: Sat, 18 Mar 2023 08:53:48 +0530 Subject: [PATCH 3/4] improve emmqx initalizer --- .../initializer/{internal => }/EmqxExhookInitializer.java | 2 +- .../emqx/initializer/{ => internal}/EmqxExhookDataHolder.java | 2 +- .../{ => internal}/EmqxExhookServiceComponent.java | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) rename components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/{internal => }/EmqxExhookInitializer.java (97%) rename components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/{ => internal}/EmqxExhookDataHolder.java (93%) rename components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/{ => internal}/EmqxExhookServiceComponent.java (94%) diff --git a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/internal/EmqxExhookInitializer.java b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/EmqxExhookInitializer.java similarity index 97% rename from components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/internal/EmqxExhookInitializer.java rename to components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/EmqxExhookInitializer.java index a6c7e5184f..b713820f7a 100644 --- a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/internal/EmqxExhookInitializer.java +++ b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/EmqxExhookInitializer.java @@ -15,7 +15,7 @@ * specific language governing permissions and limitations * under the License. */ -package io.entgra.device.mgt.plugins.emqx.initializer.internal; +package io.entgra.device.mgt.plugins.emqx.initializer; import io.entgra.device.mgt.plugins.emqx.exhook.ExServer; diff --git a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/EmqxExhookDataHolder.java b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/internal/EmqxExhookDataHolder.java similarity index 93% rename from components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/EmqxExhookDataHolder.java rename to components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/internal/EmqxExhookDataHolder.java index 9c3ab4e4f6..4810aad812 100644 --- a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/EmqxExhookDataHolder.java +++ b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/internal/EmqxExhookDataHolder.java @@ -15,7 +15,7 @@ * specific language governing permissions and limitations * under the License. */ -package io.entgra.device.mgt.plugins.emqx.initializer; +package io.entgra.device.mgt.plugins.emqx.initializer.internal; public class EmqxExhookDataHolder { diff --git a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/EmqxExhookServiceComponent.java b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/internal/EmqxExhookServiceComponent.java similarity index 94% rename from components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/EmqxExhookServiceComponent.java rename to components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/internal/EmqxExhookServiceComponent.java index cff5f07945..71c896ea7f 100644 --- a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/EmqxExhookServiceComponent.java +++ b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/internal/EmqxExhookServiceComponent.java @@ -15,9 +15,9 @@ * specific language governing permissions and limitations * under the License. */ -package io.entgra.device.mgt.plugins.emqx.initializer; +package io.entgra.device.mgt.plugins.emqx.initializer.internal; -import io.entgra.device.mgt.plugins.emqx.initializer.internal.EmqxExhookInitializer; +import io.entgra.device.mgt.plugins.emqx.initializer.EmqxExhookInitializer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.osgi.framework.BundleContext; From 084a4879549a03382155e4768145fda90e1816de Mon Sep 17 00:00:00 2001 From: Amalka Subasinghe Date: Sat, 18 Mar 2023 12:21:20 +0530 Subject: [PATCH 4/4] fixing - setting device connection status --- .../mgt/plugins/emqx/exhook/ExServer.java | 71 ++++++++++++++++++- 1 file changed, 70 insertions(+), 1 deletion(-) diff --git a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/ExServer.java b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/ExServer.java index 7fe87ca9bc..9f4f593495 100644 --- a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/ExServer.java +++ b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/ExServer.java @@ -36,7 +36,13 @@ import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; import org.wso2.carbon.device.mgt.core.config.DeviceConfigurationManager; import org.wso2.carbon.device.mgt.core.config.keymanager.KeyManagerConfigurations; - +import org.wso2.carbon.device.mgt.core.internal.DeviceManagementDataHolder; +import org.wso2.carbon.device.mgt.core.service.DeviceManagementProviderService; +import org.wso2.carbon.device.mgt.common.EnrolmentInfo; +import org.wso2.carbon.device.mgt.common.exceptions.DeviceManagementException; +import org.wso2.carbon.device.mgt.common.DeviceIdentifier; +import org.wso2.carbon.context.CarbonContext; +import org.wso2.carbon.context.PrivilegedCarbonContext; import java.io.IOException; import java.util.*; import java.util.concurrent.TimeUnit; @@ -155,9 +161,45 @@ public class ExServer { responseObserver.onCompleted(); } + public static DeviceManagementProviderService getDeviceManagementService() { + PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext(); + DeviceManagementProviderService deviceManagementProviderService = + (DeviceManagementProviderService) ctx.getOSGiService(DeviceManagementProviderService.class, null); + if (deviceManagementProviderService == null) { + String msg = "DeviceImpl Management provider service has not initialized."; + logger.error(msg); +// throw new IllegalStateException(msg); + } + return deviceManagementProviderService; + } @Override public void onClientConnack(ClientConnackRequest request, StreamObserver responseObserver) { DEBUG("onClientConnack", request); + if (request.getResultCode().equals("success")) { + String accessToken = accessTokenMap.get(request.getConninfo().getClientid()); + String scopeString = authorizedScopeMap.get(accessToken); + String[] scopeArray = scopeString.split(" "); + String deviceType = null; + String deviceId = null; + for (String scope : scopeArray) { + if (scope.startsWith("device_")) { + String[] scopeParts = scope.split("_"); + deviceType = scopeParts[1]; + deviceId = scopeParts[2]; + break; + } + } + if (!StringUtils.isEmpty(deviceType) && !StringUtils.isEmpty(deviceId)) { + try { + PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain("carbon.super"); + PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(-1234); + DeviceManagementProviderService deviceManagementProviderService = getDeviceManagementService(); + deviceManagementProviderService.changeDeviceStatus(new DeviceIdentifier(deviceId, deviceType), EnrolmentInfo.Status.ACTIVE); + } catch (DeviceManagementException e) { + logger.error("onClientConnack: Error while setting device status"); + } + } + } EmptySuccess reply = EmptySuccess.newBuilder().build(); responseObserver.onNext(reply); responseObserver.onCompleted(); @@ -396,6 +438,33 @@ public class ExServer { @Override public void onSessionTerminated(SessionTerminatedRequest request, StreamObserver responseObserver) { DEBUG("onSessionTerminated", request); + + String accessToken = accessTokenMap.get(request.getClientinfo().getClientid()); + if (!StringUtils.isEmpty(accessToken)) { + String scopeString = authorizedScopeMap.get(accessToken); + String[] scopeArray = scopeString.split(" "); + String deviceType = null; + String deviceId = null; + for (String scope : scopeArray) { + if (scope.startsWith("device_")) { + String[] scopeParts = scope.split("_"); + deviceType = scopeParts[1]; + deviceId = scopeParts[2]; + break; + } + } + if (!StringUtils.isEmpty(deviceType) && !StringUtils.isEmpty(deviceId)) { + try { + PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain("carbon.super"); + PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(-1234); + DeviceManagementProviderService deviceManagementProviderService = getDeviceManagementService();; + deviceManagementProviderService.changeDeviceStatus(new DeviceIdentifier(deviceId, deviceType), EnrolmentInfo.Status.UNREACHABLE); + } catch (DeviceManagementException e) { + logger.error("onSessionTerminated: Error while setting device status"); + } + } + } + EmptySuccess reply = EmptySuccess.newBuilder().build(); responseObserver.onNext(reply); responseObserver.onCompleted();