diff --git a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/pom.xml b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/pom.xml
new file mode 100644
index 000000000..8b80bc0c2
--- /dev/null
+++ b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/pom.xml
@@ -0,0 +1,130 @@
+
+
+
+
+
+ org.wso2.carbon.devicemgt-plugins
+ emqx-extensions
+ 6.0.13-SNAPSHOT
+ ../pom.xml
+
+
+ 4.0.0
+ io.entgra.device.mgt.plugins.emqx.exhook
+ jar
+ EMQX Extensions - Extension Hook
+
+
+
+
+ io.grpc
+ grpc-netty-shaded
+
+
+ io.grpc
+ grpc-protobuf
+
+
+ io.grpc
+ grpc-stub
+
+
+ org.apache.tomcat
+ annotations-api
+ provided
+
+
+ org.apache.httpcomponents
+ httpclient
+ provided
+
+
+ com.google.code.gson
+ gson
+ provided
+
+
+ org.wso2.carbon
+ org.wso2.carbon.core
+ provided
+
+
+ commons-lang.wso2
+ commons-lang
+ provided
+
+
+ org.wso2.carbon.devicemgt
+ org.wso2.carbon.device.mgt.core
+ provided
+
+
+
+
+
+ kr.motd.maven
+ os-maven-plugin
+ 1.6.2
+
+
+
+
+ org.xolstice.maven.plugins
+ protobuf-maven-plugin
+ 0.6.1
+
+ com.google.protobuf:protoc:3.12.0:exe:${os.detected.classifier}
+ grpc-java
+ io.grpc:protoc-gen-grpc-java:1.36.0:exe:${os.detected.classifier}
+
+
+
+
+ compile
+ compile-custom
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+
+
+
+ io.entgra.device.mgt.plugins.emqx.exhook.ExServer
+
+
+
+ jar-with-dependencies
+
+
+
+
+ assemble-all
+ package
+
+ single
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/ExServer.java b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/ExServer.java
new file mode 100644
index 000000000..7fe87ca9b
--- /dev/null
+++ b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/ExServer.java
@@ -0,0 +1,477 @@
+/*
+ * Copyright (c) 2023, Entgra (Pvt) Ltd. (http://www.entgra.io) All Rights Reserved.
+ *
+ * Entgra (Pvt) Ltd. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.entgra.device.mgt.plugins.emqx.exhook;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.GeneratedMessageV3;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.grpc.stub.StreamObserver;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.wso2.carbon.device.mgt.core.config.DeviceConfigurationManager;
+import org.wso2.carbon.device.mgt.core.config.keymanager.KeyManagerConfigurations;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+public class ExServer {
+ private static final Log logger = LogFactory.getLog(ExServer.class.getName());
+
+ private static Map accessTokenMap = new HashMap<>();
+ private static Map authorizedScopeMap = new HashMap<>();
+ private Server server;
+
+ public ExServer() {
+ }
+
+ public void start() throws IOException {
+ /* The port on which the server should run */
+ int port = 9000;
+
+ server = ServerBuilder.forPort(port)
+ .addService(new HookProviderImpl())
+ .build()
+ .start();
+ logger.info("Server started, listening on " + port);
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ // Use stderr here since the logger may have been reset by its JVM shutdown hook.
+ System.err.println("*** shutting down gRPC server since JVM is shutting down");
+ try {
+ ExServer.this.stop();
+ } catch (InterruptedException e) {
+ e.printStackTrace(System.err);
+ }
+ System.err.println("*** server shut down");
+ }
+ });
+ }
+
+ public void stop() throws InterruptedException {
+ if (server != null) {
+ server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
+ }
+ }
+
+ /**
+ * Await termination on the main thread since the grpc library uses daemon threads.
+ */
+ public void blockUntilShutdown() throws InterruptedException {
+ if (server != null) {
+ server.awaitTermination();
+ }
+ }
+
+ /**
+ * Main launches the server from the command line.
+ */
+ public static void main(String[] args) throws IOException, InterruptedException {
+ final ExServer server = new ExServer();
+ server.start();
+ server.blockUntilShutdown();
+ }
+
+ static class HookProviderImpl extends HookProviderGrpc.HookProviderImplBase {
+
+
+
+ public void DEBUG(String fn, Object req) {
+ System.out.printf(fn + ", request: " + req);
+ }
+
+ @Override
+ public void onProviderLoaded(ProviderLoadedRequest request, StreamObserver responseObserver) {
+ DEBUG("onProviderLoaded", request);
+ HookSpec[] specs = {
+ HookSpec.newBuilder().setName("client.connect").build(),
+ HookSpec.newBuilder().setName("client.connack").build(),
+ HookSpec.newBuilder().setName("client.connected").build(),
+ HookSpec.newBuilder().setName("client.disconnected").build(),
+ HookSpec.newBuilder().setName("client.authenticate").build(),
+ HookSpec.newBuilder().setName("client.check_acl").build(),
+ HookSpec.newBuilder().setName("client.subscribe").build(),
+ HookSpec.newBuilder().setName("client.unsubscribe").build(),
+
+ HookSpec.newBuilder().setName("session.created").build(),
+ HookSpec.newBuilder().setName("session.subscribed").build(),
+ HookSpec.newBuilder().setName("session.unsubscribed").build(),
+ HookSpec.newBuilder().setName("session.resumed").build(),
+ HookSpec.newBuilder().setName("session.discarded").build(),
+ HookSpec.newBuilder().setName("session.takeovered").build(),
+ HookSpec.newBuilder().setName("session.terminated").build(),
+
+ HookSpec.newBuilder().setName("message.publish").build(),
+ HookSpec.newBuilder().setName("message.delivered").build(),
+ HookSpec.newBuilder().setName("message.acked").build(),
+ HookSpec.newBuilder().setName("message.dropped").build()
+ };
+ LoadedResponse reply = LoadedResponse.newBuilder().addAllHooks(Arrays.asList(specs)).build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void onProviderUnloaded(ProviderUnloadedRequest request, StreamObserver responseObserver) {
+ DEBUG("onProviderUnloaded", request);
+ EmptySuccess reply = EmptySuccess.newBuilder().build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void onClientConnect(ClientConnectRequest request, StreamObserver responseObserver) {
+ logger.info("onClientConnect -----------------------------");
+ DEBUG("onClientConnect", request);
+ EmptySuccess reply = EmptySuccess.newBuilder().build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void onClientConnack(ClientConnackRequest request, StreamObserver responseObserver) {
+ DEBUG("onClientConnack", request);
+ EmptySuccess reply = EmptySuccess.newBuilder().build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void onClientConnected(ClientConnectedRequest request, StreamObserver responseObserver) {
+ DEBUG("onClientConnected", request);
+ EmptySuccess reply = EmptySuccess.newBuilder().build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void onClientDisconnected(ClientDisconnectedRequest request, StreamObserver responseObserver) {
+ logger.info("onClientDisconnected -----------------------------");
+ DEBUG("onClientDisconnected", request);
+ EmptySuccess reply = EmptySuccess.newBuilder().build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void onClientAuthenticate(ClientAuthenticateRequest request, StreamObserver responseObserver) {
+ DEBUG("onClientAuthenticate", request);
+
+ if (!StringUtils.isEmpty(request.getClientinfo().getUsername()) &&
+ !StringUtils.isEmpty(request.getClientinfo().getPassword())) {
+
+ DEBUG("on access token passes", request);
+ try {
+ String accessToken = request.getClientinfo().getUsername() + "-" + request.getClientinfo().getPassword();
+ KeyManagerConfigurations keyManagerConfig = DeviceConfigurationManager.getInstance()
+ .getDeviceManagementConfig().getKeyManagerConfigurations();
+
+ HttpPost tokenEndpoint = new HttpPost(
+ keyManagerConfig.getServerUrl() + HandlerConstants.INTROSPECT_ENDPOINT);
+ tokenEndpoint.setHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_FORM_URLENCODED.toString());
+ tokenEndpoint.setHeader(HttpHeaders.AUTHORIZATION, HandlerConstants.BASIC + Base64.getEncoder()
+ .encodeToString((keyManagerConfig.getAdminUsername() + HandlerConstants.COLON
+ + keyManagerConfig.getAdminPassword()).getBytes()));
+ StringEntity tokenEPPayload = new StringEntity("token=" + accessToken,
+ ContentType.APPLICATION_FORM_URLENCODED);
+ tokenEndpoint.setEntity(tokenEPPayload);
+ ProxyResponse tokenStatus = HandlerUtil.execute(tokenEndpoint);
+
+ if (tokenStatus.getExecutorResponse().contains(HandlerConstants.EXECUTOR_EXCEPTION_PREFIX)) {
+ if (tokenStatus.getCode() == HttpStatus.SC_UNAUTHORIZED) {
+ // return with error
+ logger.error("Unauthorized");
+ responseObserver.onError(new Exception("unauthorized"));
+ return;
+ } else {
+ // return with error
+ logger.error("error occurred while checking access token");
+ responseObserver.onError(new Exception("error occurred while checking access token"));
+ return;
+ }
+ }
+
+ String tokenData = tokenStatus.getData();
+ if (tokenData == null) {
+ // return with error
+ logger.error("invalid token data is received");
+ responseObserver.onError(new Exception("invalid token data is received"));
+ return;
+ }
+ JsonParser jsonParser = new JsonParser();
+ JsonElement jTokenResult = jsonParser.parse(tokenData);
+ if (jTokenResult.isJsonObject()) {
+ JsonObject jTokenResultAsJsonObject = jTokenResult.getAsJsonObject();
+ if (!jTokenResultAsJsonObject.get("active").getAsBoolean()) {
+ logger.error("access token is expired");
+ responseObserver.onError(new Exception("access token is expired"));
+ return;
+ }
+ // success
+ accessTokenMap.put(request.getClientinfo().getClientid(), accessToken);
+ authorizedScopeMap.put(accessToken, jTokenResultAsJsonObject.get("scope").getAsString());
+ logger.info("authenticated");
+ ValuedResponse reply = ValuedResponse.newBuilder()
+ .setBoolResult(true)
+ .setType(ValuedResponse.ResponsedType.STOP_AND_RETURN)
+ .build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+// } else {
+// ValuedResponse reply = ValuedResponse.newBuilder()
+// .setBoolResult(true)
+// .setType(ValuedResponse.ResponsedType.STOP_AND_RETURN)
+// .build();
+// responseObserver.onNext(reply);
+// responseObserver.onCompleted();
+ }
+ }
+
+ @Override
+ public void onClientCheckAcl(ClientCheckAclRequest request, StreamObserver responseObserver) {
+ DEBUG("onClientCheckAcl", request);
+ /*
+ carbon.super/deviceType/deviceId
+ data/carbon.super/deviceType/deviceId
+ data/carbonsuper/deviceType/deviceId
+ republished/deviceType
+ */
+ if (!StringUtils.isEmpty(request.getClientinfo().getUsername()) &&
+ StringUtils.isEmpty(request.getClientinfo().getPassword())) {
+ //todo: check token validity
+ String accessToken = accessTokenMap.get(request.getClientinfo().getClientid());
+ if (StringUtils.isEmpty(accessToken) || !accessToken.startsWith(request.getClientinfo().getUsername())) {
+ logger.info("Valid access token not found");
+ responseObserver.onError(new Exception("not authorized"));
+ }
+
+ String authorizedScopeList = authorizedScopeMap.get(accessToken);
+ String[] scopeArray = authorizedScopeList.split(" ");
+ List scopeList = Arrays.asList(scopeArray);
+ boolean isFound = false;
+
+ String tempScope = null;
+ if (request.getType().equals(ClientCheckAclRequest.AclReqType.SUBSCRIBE) ||
+ request.getType().equals(ClientCheckAclRequest.AclReqType.PUBLISH)) {
+
+ String requestTopic = request.getTopic();
+
+ if (requestTopic.endsWith("/#")) {
+ requestTopic = requestTopic.substring(0, requestTopic.indexOf("/#"));
+ }
+
+ // replace / with :
+ requestTopic = requestTopic.replace("/", ":");
+
+ if (request.getType().equals(ClientCheckAclRequest.AclReqType.SUBSCRIBE)) {
+ tempScope = "perm:topic:sub:" + requestTopic;
+ }
+ if (request.getType().equals(ClientCheckAclRequest.AclReqType.PUBLISH)) {
+ tempScope = "perm:topic:pub:" + requestTopic;
+ }
+
+ if (scopeList.contains(tempScope)) {
+ isFound = true;
+ }
+
+ if (isFound) {
+ ValuedResponse reply = ValuedResponse.newBuilder()
+ .setBoolResult(true)
+ .setType(ValuedResponse.ResponsedType.STOP_AND_RETURN)
+ .build();
+
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ } else {
+ responseObserver.onError(new Exception("not authorized"));
+ }
+ }
+ } else {
+ //default
+ ValuedResponse reply = ValuedResponse.newBuilder()
+ .setBoolResult(true)
+ .setType(ValuedResponse.ResponsedType.STOP_AND_RETURN)
+ .build();
+
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ }
+ }
+
+ @Override
+ public void onClientSubscribe(ClientSubscribeRequest request, StreamObserver responseObserver) {
+ DEBUG("onClientSubscribe", request);
+ EmptySuccess reply = EmptySuccess.newBuilder().build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void onClientUnsubscribe(ClientUnsubscribeRequest request, StreamObserver responseObserver) {
+ DEBUG("onClientUnsubscribe", request);
+ EmptySuccess reply = EmptySuccess.newBuilder().build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void onSessionCreated(SessionCreatedRequest request, StreamObserver responseObserver) {
+ DEBUG("onSessionCreated", request);
+ EmptySuccess reply = EmptySuccess.newBuilder().build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void onSessionSubscribed(SessionSubscribedRequest request, StreamObserver responseObserver) {
+ DEBUG("onSessionSubscribed", request);
+ EmptySuccess reply = EmptySuccess.newBuilder().build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void onSessionUnsubscribed(SessionUnsubscribedRequest request, StreamObserver responseObserver) {
+ DEBUG("onSessionUnsubscribed", request);
+ EmptySuccess reply = EmptySuccess.newBuilder().build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void onSessionResumed(SessionResumedRequest request, StreamObserver responseObserver) {
+ DEBUG("onSessionResumed", request);
+ EmptySuccess reply = EmptySuccess.newBuilder().build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void onSessionDiscarded(SessionDiscardedRequest request, StreamObserver responseObserver) {
+ DEBUG("onSessionDdiscarded", request);
+ EmptySuccess reply = EmptySuccess.newBuilder().build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void onSessionTakeovered(SessionTakeoveredRequest request, StreamObserver responseObserver) {
+ DEBUG("onSessionTakeovered", request);
+ EmptySuccess reply = EmptySuccess.newBuilder().build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void onSessionTerminated(SessionTerminatedRequest request, StreamObserver responseObserver) {
+ DEBUG("onSessionTerminated", request);
+ EmptySuccess reply = EmptySuccess.newBuilder().build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void onMessagePublish(MessagePublishRequest request, StreamObserver responseObserver) {
+ DEBUG("onMessagePublish", request);
+
+ ByteString bstr = ByteString.copyFromUtf8("hardcode payload by exhook-svr-java :)");
+
+ Message nmsg = Message.newBuilder()
+ .setId (request.getMessage().getId())
+ .setNode (request.getMessage().getNode())
+ .setFrom (request.getMessage().getFrom())
+ .setTopic (request.getMessage().getTopic())
+ .setPayload(((GeneratedMessageV3) request).toByteString()).build();
+
+
+ ValuedResponse reply = ValuedResponse.newBuilder()
+ .setType(ValuedResponse.ResponsedType.STOP_AND_RETURN)
+ .setMessage(nmsg).build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ }
+
+// case2: stop publish the 't/d' messages
+// @Override
+// public void onMessagePublish(MessagePublishRequest request, StreamObserver responseObserver) {
+// DEBUG("onMessagePublish", request);
+//
+// Message nmsg = request.getMessage();
+// if ("t/d".equals(nmsg.getTopic())) {
+// ByteString bstr = ByteString.copyFromUtf8("");
+// nmsg = Message.newBuilder()
+// .setId (request.getMessage().getId())
+// .setNode (request.getMessage().getNode())
+// .setFrom (request.getMessage().getFrom())
+// .setTopic (request.getMessage().getTopic())
+// .setPayload(bstr)
+// .putHeaders("allow_publish", "false").build();
+// }
+//
+// ValuedResponse reply = ValuedResponse.newBuilder()
+// .setType(ValuedResponse.ResponsedType.STOP_AND_RETURN)
+// .setMessage(nmsg).build();
+// responseObserver.onNext(reply);
+// responseObserver.onCompleted();
+// }
+
+ @Override
+ public void onMessageDelivered(MessageDeliveredRequest request, StreamObserver responseObserver) {
+ DEBUG("onMessageDelivered", request);
+ EmptySuccess reply = EmptySuccess.newBuilder().build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void onMessageAcked(MessageAckedRequest request, StreamObserver responseObserver) {
+ DEBUG("onMessageAcked", request);
+ EmptySuccess reply = EmptySuccess.newBuilder().build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void onMessageDropped(MessageDroppedRequest request, StreamObserver responseObserver) {
+ DEBUG("onMessageDropped", request);
+ EmptySuccess reply = EmptySuccess.newBuilder().build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ }
+
+
+ }
+
+
+}
diff --git a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/HandlerConstants.java b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/HandlerConstants.java
new file mode 100644
index 000000000..aa7b09623
--- /dev/null
+++ b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/HandlerConstants.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2023, Entgra (Pvt) Ltd. (http://www.entgra.io) All Rights Reserved.
+ *
+ * Entgra (Pvt) Ltd. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.entgra.device.mgt.plugins.emqx.exhook;
+
+public class HandlerConstants {
+ public static final String INTROSPECT_ENDPOINT = "/oauth2/introspect";
+ public static final String BASIC = "Basic ";
+ public static final String EXECUTOR_EXCEPTION_PREFIX = "ExecutorException-";
+ public static final String TOKEN_IS_EXPIRED = "ACCESS_TOKEN_IS_EXPIRED";
+ public static final String COLON = ":";
+ public static final int INTERNAL_ERROR_CODE = 500;
+}
diff --git a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/HandlerUtil.java b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/HandlerUtil.java
new file mode 100644
index 000000000..b0c3485cb
--- /dev/null
+++ b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/HandlerUtil.java
@@ -0,0 +1,280 @@
+/*
+ * Copyright (c) 2023, Entgra (Pvt) Ltd. (http://www.entgra.io) All Rights Reserved.
+ *
+ * Entgra (Pvt) Ltd. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.entgra.device.mgt.plugins.emqx.exhook;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+import java.io.*;
+import java.security.*;
+import java.security.cert.CertificateException;
+
+public class HandlerUtil {
+
+ private static KeyStore keyStore;
+ private static KeyStore trustStore;
+ private static char[] keyStorePassword;
+ private static SSLContext sslContext;
+
+ private static final String KEY_STORE_TYPE = "JKS";
+ /**
+ * Default truststore type of the client
+ */
+ private static final String TRUST_STORE_TYPE = "JKS";
+
+ private static final String KEY_MANAGER_TYPE = "SunX509"; //Default Key Manager Type
+ /**
+ * Default trustmanager type of the client
+ */
+ private static final String TRUST_MANAGER_TYPE = "SunX509"; //Default Trust Manager Type
+
+ private static final String SSLV3 = "SSLv3";
+
+ private static final Log log = LogFactory.getLog(HandlerUtil.class);
+
+ /***
+ *
+ * @param httpRequest - httpMethod e.g:- HttpPost, HttpGet
+ * @return response as string
+ * @throws IOException IO exception returns if error occurs when executing the httpMethod
+ */
+ public static ProxyResponse execute(HttpRequestBase httpRequest) throws IOException {
+ try (CloseableHttpClient client = getHttpClient()) {
+ HttpResponse response = client.execute(httpRequest);
+ ProxyResponse proxyResponse = new ProxyResponse();
+
+ if (response == null) {
+ log.error("Received null response for http request : " + httpRequest.getMethod() + " " + httpRequest
+ .getURI().toString());
+ proxyResponse.setCode(HandlerConstants.INTERNAL_ERROR_CODE);
+ proxyResponse.setStatus(ProxyResponse.Status.ERROR);
+ proxyResponse.setExecutorResponse(HandlerConstants.EXECUTOR_EXCEPTION_PREFIX + getStatusKey(
+ HandlerConstants.INTERNAL_ERROR_CODE));
+ return proxyResponse;
+ } else {
+ int statusCode = response.getStatusLine().getStatusCode();
+ String jsonString = getResponseString(response);
+ if (statusCode == HttpStatus.SC_OK || statusCode == HttpStatus.SC_CREATED) {
+ proxyResponse.setCode(statusCode);
+ proxyResponse.setData(jsonString);
+ proxyResponse.setStatus(ProxyResponse.Status.SUCCESS);
+ proxyResponse.setExecutorResponse("SUCCESS");
+ proxyResponse.setHeaders(response.getAllHeaders());
+ return proxyResponse;
+ } else if (statusCode == HttpStatus.SC_UNAUTHORIZED) {
+ if (isTokenExpired(jsonString)) {
+ proxyResponse.setCode(statusCode);
+ proxyResponse.setStatus(ProxyResponse.Status.ERROR);
+ proxyResponse.setExecutorResponse(HandlerConstants.TOKEN_IS_EXPIRED);
+ } else {
+ log.error(
+ "Received " + statusCode + " response for http request : " + httpRequest.getMethod()
+ + " " + httpRequest.getURI().toString() + ". Error message: " + jsonString);
+ proxyResponse.setCode(statusCode);
+ proxyResponse.setData(jsonString);
+ proxyResponse.setStatus(ProxyResponse.Status.ERROR);
+ proxyResponse.setExecutorResponse(
+ HandlerConstants.EXECUTOR_EXCEPTION_PREFIX + getStatusKey(statusCode));
+ }
+ return proxyResponse;
+ }
+ log.error("Received " + statusCode +
+ " response for http request : " + httpRequest.getMethod() + " " + httpRequest.getURI()
+ .toString() + ". Error message: " + jsonString);
+ proxyResponse.setCode(statusCode);
+ proxyResponse.setData(jsonString);
+ proxyResponse.setStatus(ProxyResponse.Status.ERROR);
+ proxyResponse
+ .setExecutorResponse(HandlerConstants.EXECUTOR_EXCEPTION_PREFIX + getStatusKey(statusCode));
+ return proxyResponse;
+ }
+ }
+ }
+
+ public static boolean isTokenExpired(String jsonBody) {
+ return jsonBody.contains("Access token expired") || jsonBody
+ .contains("Invalid input. Access token validation failed");
+ }
+
+ /***
+ *
+ * @param statusCode Provide status code, e.g:- 400, 401, 500 etc
+ * @return relative status code key for given status code.
+ */
+ public static String getStatusKey(int statusCode) {
+ String statusCodeKey;
+
+ switch (statusCode) {
+ case HttpStatus.SC_INTERNAL_SERVER_ERROR:
+ statusCodeKey = "internalServerError";
+ break;
+ case HttpStatus.SC_BAD_REQUEST:
+ statusCodeKey = "badRequest";
+ break;
+ case HttpStatus.SC_UNAUTHORIZED:
+ statusCodeKey = "unauthorized";
+ break;
+ case HttpStatus.SC_FORBIDDEN:
+ statusCodeKey = "forbidden";
+ break;
+ case HttpStatus.SC_NOT_FOUND:
+ statusCodeKey = "notFound";
+ break;
+ case HttpStatus.SC_METHOD_NOT_ALLOWED:
+ statusCodeKey = "methodNotAllowed";
+ break;
+ case HttpStatus.SC_NOT_ACCEPTABLE:
+ statusCodeKey = "notAcceptable";
+ break;
+ case HttpStatus.SC_UNSUPPORTED_MEDIA_TYPE:
+ statusCodeKey = "unsupportedMediaType";
+ break;
+ default:
+ statusCodeKey = "defaultPage";
+ break;
+ }
+ return statusCodeKey;
+ }
+
+ /**
+ * Retrieve Http client based on hostname verification.
+ *
+ * @return {@link CloseableHttpClient} http client
+ */
+ public static CloseableHttpClient getHttpClient() {
+
+ boolean isIgnoreHostnameVerification = Boolean.parseBoolean(System.
+ getProperty("org.wso2.ignoreHostnameVerification"));
+ if (isIgnoreHostnameVerification) {
+ return HttpClients.custom().setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE).build();
+ } else {
+ return HttpClients.createDefault();
+ }
+
+// String keyStorePassword = "wso2carbon";
+// String trustStorePassword = "wso2carbon";
+// String keyStoreLocation = "/home/amalka/entgra/source/product-switchgear/switchgear-plugin/integration-test/distro-prep/target/entgra-iot-pro-switchgear-repacked-1.0.0-SNAPSHOT/repository/resources/security/wso2carbon.jks";
+// String trustStoreLocation = "/home/amalka/entgra/source/product-switchgear/switchgear-plugin/integration-test/distro-prep/target/entgra-iot-pro-switchgear-repacked-1.0.0-SNAPSHOT/repository/resources/security/client-truststore.jks";
+//
+// //Call to load the keystore.
+// try {
+// loadKeyStore(keyStoreLocation, keyStorePassword);
+// } catch (IOException e) {
+// throw new RuntimeException(e);
+// } catch (CertificateException e) {
+// throw new RuntimeException(e);
+// } catch (NoSuchAlgorithmException e) {
+// throw new RuntimeException(e);
+// } catch (KeyStoreException e) {
+// throw new RuntimeException(e);
+// }
+// //Call to load the TrustStore.
+// try {
+// loadTrustStore(trustStoreLocation, trustStorePassword);
+// } catch (KeyStoreException e) {
+// throw new RuntimeException(e);
+// } catch (IOException e) {
+// throw new RuntimeException(e);
+// } catch (CertificateException e) {
+// throw new RuntimeException(e);
+// } catch (NoSuchAlgorithmException e) {
+// throw new RuntimeException(e);
+// }
+// //Create the SSL context with the loaded TrustStore/keystore.
+// try {
+// initSSLConnection();
+// } catch (NoSuchAlgorithmException e) {
+// throw new RuntimeException(e);
+// } catch (UnrecoverableKeyException e) {
+// throw new RuntimeException(e);
+// } catch (KeyStoreException e) {
+// throw new RuntimeException(e);
+// } catch (KeyManagementException e) {
+// throw new RuntimeException(e);
+// }
+//
+// return HttpClients.createDefault();
+
+ }
+
+ private static void loadKeyStore(String keyStorePath, String ksPassword)
+ throws IOException, CertificateException, NoSuchAlgorithmException, KeyStoreException {
+ InputStream fis = null;
+ try {
+ keyStorePassword = ksPassword.toCharArray();
+ keyStore = KeyStore.getInstance(KEY_STORE_TYPE);
+ fis = new FileInputStream(keyStorePath);
+ keyStore.load(fis, keyStorePassword);
+
+ } finally {
+ if (fis != null) {
+ fis.close();
+ }
+ }
+ }
+
+ private static void loadTrustStore(String trustStorePath, String tsPassword)
+ throws KeyStoreException, IOException, CertificateException, NoSuchAlgorithmException {
+
+ InputStream fis = null;
+ try {
+ trustStore = KeyStore.getInstance(TRUST_STORE_TYPE);
+ fis = new FileInputStream(trustStorePath);
+ trustStore.load(fis, tsPassword.toCharArray());
+ } finally {
+ if (fis != null) {
+ fis.close();
+ }
+ }
+ }
+
+ private static void initSSLConnection() throws NoSuchAlgorithmException, UnrecoverableKeyException,
+ KeyStoreException, KeyManagementException {
+ KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KEY_MANAGER_TYPE);
+ keyManagerFactory.init(keyStore, keyStorePassword);
+ TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TRUST_MANAGER_TYPE);
+ trustManagerFactory.init(trustStore);
+
+ // Create and initialize SSLContext for HTTPS communication
+ sslContext = SSLContext.getInstance(SSLV3);
+ sslContext.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), null);
+ SSLContext.setDefault(sslContext);
+ }
+
+ public static String getResponseString(HttpResponse response) throws IOException {
+ try (BufferedReader rd = new BufferedReader(new InputStreamReader(response.getEntity().getContent()))) {
+ StringBuilder responseBuilder = new StringBuilder();
+ String line;
+ while ((line = rd.readLine()) != null) {
+ responseBuilder.append(line);
+ }
+ return responseBuilder.toString();
+ }
+ }
+
+}
diff --git a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/ProxyResponse.java b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/ProxyResponse.java
new file mode 100644
index 000000000..1eadf8f20
--- /dev/null
+++ b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/java/io/entgra/device/mgt/plugins/emqx/exhook/ProxyResponse.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright (c) 2023, Entgra (Pvt) Ltd. (http://www.entgra.io) All Rights Reserved.
+ *
+ * Entgra (Pvt) Ltd. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.entgra.device.mgt.plugins.emqx.exhook;
+
+import org.apache.http.Header;
+
+public class ProxyResponse {
+
+ public static class Status {
+ public static int SUCCESS = 1;
+ public static int ERROR = 0;
+ }
+
+ private int code;
+ private String data;
+ private String executorResponse;
+ private int status;
+ private Header[] headers;
+
+ public int getCode() { return code; }
+
+ public void setCode(int code) { this.code = code; }
+
+ public String getData() { return data; }
+
+ public void setData(String data) { this.data = data; }
+
+ public String getExecutorResponse() { return executorResponse; }
+
+ public void setExecutorResponse(String executorResponse) { this.executorResponse = executorResponse; }
+
+ public int getStatus() {
+ return status;
+ }
+
+ public void setStatus(int status) {
+ this.status = status;
+ }
+
+ public Header[] getHeaders() {
+ return headers;
+ }
+
+ public void setHeaders(Header[] headers) {
+ this.headers = headers;
+ }
+
+}
diff --git a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/proto/exhook.proto b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/proto/exhook.proto
new file mode 100644
index 000000000..0e7cab764
--- /dev/null
+++ b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.exhook/src/main/proto/exhook.proto
@@ -0,0 +1,434 @@
+//
+// Copyright (c) 2023, Entgra (Pvt) Ltd. (http://www.entgra.io) All Rights Reserved.
+//
+// Entgra (Pvt) Ltd. licenses this file to you under the Apache License,
+// Version 2.0 (the "License"); you may not use this file except
+// in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+syntax = "proto3";
+
+option csharp_namespace = "Emqx.Exhook.V1";
+option go_package = "emqx.io/grpc/exhook";
+option java_multiple_files = true;
+option java_package = "io.entgra.device.mgt.plugins.emqx.exhook";
+option java_outer_classname = "EmqxExHookProto";
+
+package emqx.exhook.v1;
+
+service HookProvider {
+
+ rpc OnProviderLoaded(ProviderLoadedRequest) returns (LoadedResponse) {};
+
+ rpc OnProviderUnloaded(ProviderUnloadedRequest) returns (EmptySuccess) {};
+
+ rpc OnClientConnect(ClientConnectRequest) returns (EmptySuccess) {};
+
+ rpc OnClientConnack(ClientConnackRequest) returns (EmptySuccess) {};
+
+ rpc OnClientConnected(ClientConnectedRequest) returns (EmptySuccess) {};
+
+ rpc OnClientDisconnected(ClientDisconnectedRequest) returns (EmptySuccess) {};
+
+ rpc OnClientAuthenticate(ClientAuthenticateRequest) returns (ValuedResponse) {};
+
+ rpc OnClientCheckAcl(ClientCheckAclRequest) returns (ValuedResponse) {};
+
+ rpc OnClientSubscribe(ClientSubscribeRequest) returns (EmptySuccess) {};
+
+ rpc OnClientUnsubscribe(ClientUnsubscribeRequest) returns (EmptySuccess) {};
+
+ rpc OnSessionCreated(SessionCreatedRequest) returns (EmptySuccess) {};
+
+ rpc OnSessionSubscribed(SessionSubscribedRequest) returns (EmptySuccess) {};
+
+ rpc OnSessionUnsubscribed(SessionUnsubscribedRequest) returns (EmptySuccess) {};
+
+ rpc OnSessionResumed(SessionResumedRequest) returns (EmptySuccess) {};
+
+ rpc OnSessionDiscarded(SessionDiscardedRequest) returns (EmptySuccess) {};
+
+ rpc OnSessionTakeovered(SessionTakeoveredRequest) returns (EmptySuccess) {};
+
+ rpc OnSessionTerminated(SessionTerminatedRequest) returns (EmptySuccess) {};
+
+ rpc OnMessagePublish(MessagePublishRequest) returns (ValuedResponse) {};
+
+ rpc OnMessageDelivered(MessageDeliveredRequest) returns (EmptySuccess) {};
+
+ rpc OnMessageDropped(MessageDroppedRequest) returns (EmptySuccess) {};
+
+ rpc OnMessageAcked(MessageAckedRequest) returns (EmptySuccess) {};
+}
+
+//------------------------------------------------------------------------------
+// Request & Response
+//------------------------------------------------------------------------------
+
+message ProviderLoadedRequest {
+
+ BrokerInfo broker = 1;
+}
+
+message LoadedResponse {
+
+ repeated HookSpec hooks = 1;
+}
+
+message ProviderUnloadedRequest { }
+
+message ClientConnectRequest {
+
+ ConnInfo conninfo = 1;
+
+ // MQTT CONNECT packet's properties (MQTT v5.0)
+ //
+ // It should be empty on MQTT v3.1.1/v3.1 or others protocol
+ repeated Property props = 2;
+}
+
+message ClientConnackRequest {
+
+ ConnInfo conninfo = 1;
+
+ string result_code = 2;
+
+ repeated Property props = 3;
+}
+
+message ClientConnectedRequest {
+
+ ClientInfo clientinfo = 1;
+}
+
+message ClientDisconnectedRequest {
+
+ ClientInfo clientinfo = 1;
+
+ string reason = 2;
+}
+
+message ClientAuthenticateRequest {
+
+ ClientInfo clientinfo = 1;
+
+ bool result = 2;
+}
+
+message ClientCheckAclRequest {
+
+ ClientInfo clientinfo = 1;
+
+ enum AclReqType {
+
+ PUBLISH = 0;
+
+ SUBSCRIBE = 1;
+ }
+
+ AclReqType type = 2;
+
+ string topic = 3;
+
+ bool result = 4;
+}
+
+message ClientSubscribeRequest {
+
+ ClientInfo clientinfo = 1;
+
+ repeated Property props = 2;
+
+ repeated TopicFilter topic_filters = 3;
+}
+
+message ClientUnsubscribeRequest {
+
+ ClientInfo clientinfo = 1;
+
+ repeated Property props = 2;
+
+ repeated TopicFilter topic_filters = 3;
+}
+
+message SessionCreatedRequest {
+
+ ClientInfo clientinfo = 1;
+}
+
+message SessionSubscribedRequest {
+
+ ClientInfo clientinfo = 1;
+
+ string topic = 2;
+
+ SubOpts subopts = 3;
+}
+
+message SessionUnsubscribedRequest {
+
+ ClientInfo clientinfo = 1;
+
+ string topic = 2;
+}
+
+message SessionResumedRequest {
+
+ ClientInfo clientinfo = 1;
+}
+
+message SessionDiscardedRequest {
+
+ ClientInfo clientinfo = 1;
+}
+
+message SessionTakeoveredRequest {
+
+ ClientInfo clientinfo = 1;
+}
+
+message SessionTerminatedRequest {
+
+ ClientInfo clientinfo = 1;
+
+ string reason = 2;
+}
+
+message MessagePublishRequest {
+
+ Message message = 1;
+}
+
+message MessageDeliveredRequest {
+
+ ClientInfo clientinfo = 1;
+
+ Message message = 2;
+}
+
+message MessageDroppedRequest {
+
+ Message message = 1;
+
+ string reason = 2;
+}
+
+message MessageAckedRequest {
+
+ ClientInfo clientinfo = 1;
+
+ Message message = 2;
+}
+
+//------------------------------------------------------------------------------
+// Basic data types
+//------------------------------------------------------------------------------
+
+message EmptySuccess { }
+
+message ValuedResponse {
+
+ // The responsed value type
+ // - contiune: Use the responsed value and execute the next hook
+ // - ignore: Ignore the responsed value
+ // - stop_and_return: Use the responsed value and stop the chain executing
+ enum ResponsedType {
+
+ CONTINUE = 0;
+
+ IGNORE = 1;
+
+ STOP_AND_RETURN = 2;
+ }
+
+ ResponsedType type = 1;
+
+ oneof value {
+
+ // Boolean result, used on the 'client.authenticate', 'client.check_acl' hooks
+ bool bool_result = 3;
+
+ // Message result, used on the 'message.*' hooks
+ Message message = 4;
+ }
+}
+
+message BrokerInfo {
+
+ string version = 1;
+
+ string sysdescr = 2;
+
+ string uptime = 3;
+
+ string datetime = 4;
+}
+
+message HookSpec {
+
+ // The registered hooks name
+ //
+ // Available value:
+ // "client.connect", "client.connack"
+ // "client.connected", "client.disconnected"
+ // "client.authenticate", "client.check_acl"
+ // "client.subscribe", "client.unsubscribe"
+ //
+ // "session.created", "session.subscribed"
+ // "session.unsubscribed", "session.resumed"
+ // "session.discarded", "session.takeovered"
+ // "session.terminated"
+ //
+ // "message.publish", "message.delivered"
+ // "message.acked", "message.dropped"
+ string name = 1;
+
+ // The topic filters for message hooks
+ repeated string topics = 2;
+}
+
+message ConnInfo {
+
+ string node = 1;
+
+ string clientid = 2;
+
+ string username = 3;
+
+ string peerhost = 4;
+
+ uint32 sockport = 5;
+
+ string proto_name = 6;
+
+ string proto_ver = 7;
+
+ uint32 keepalive = 8;
+}
+
+message ClientInfo {
+
+ string node = 1;
+
+ string clientid = 2;
+
+ string username = 3;
+
+ string password = 4;
+
+ string peerhost = 5;
+
+ uint32 sockport = 6;
+
+ string protocol = 7;
+
+ string mountpoint = 8;
+
+ bool is_superuser = 9;
+
+ bool anonymous = 10;
+
+ // common name of client TLS cert
+ string cn = 11;
+
+ // subject of client TLS cert
+ string dn = 12;
+}
+
+message Message {
+
+ string node = 1;
+
+ string id = 2;
+
+ uint32 qos = 3;
+
+ string from = 4;
+
+ string topic = 5;
+
+ bytes payload = 6;
+
+ uint64 timestamp = 7;
+
+ // The key of header can be:
+ // - username:
+ // * Readonly
+ // * The username of sender client
+ // * Value type: utf8 string
+ // - protocol:
+ // * Readonly
+ // * The protocol name of sender client
+ // * Value type: string enum with "mqtt", "mqtt-sn", ...
+ // - peerhost:
+ // * Readonly
+ // * The peerhost of sender client
+ // * Value type: ip address string
+ // - allow_publish:
+ // * Writable
+ // * Whether to allow the message to be published by emqx
+ // * Value type: string enum with "true", "false", default is "true"
+ //
+ // Notes: All header may be missing, which means that the message does not
+ // carry these headers. We can guarantee that clients coming from MQTT,
+ // MQTT-SN, CoAP, LwM2M and other natively supported protocol clients will
+ // carry these headers, but there is no guarantee that messages published
+ // by other means will do, e.g. messages published by HTTP-API
+ map headers = 8;
+}
+
+message Property {
+
+ string name = 1;
+
+ string value = 2;
+}
+
+message TopicFilter {
+
+ string name = 1;
+
+ uint32 qos = 2;
+}
+
+message SubOpts {
+
+ // The QoS level
+ uint32 qos = 1;
+
+ // The group name for shared subscription
+ string share = 2;
+
+ // The Retain Handling option (MQTT v5.0)
+ //
+ // 0 = Send retained messages at the time of the subscribe
+ // 1 = Send retained messages at subscribe only if the subscription does
+ // not currently exist
+ // 2 = Do not send retained messages at the time of the subscribe
+ uint32 rh = 3;
+
+ // The Retain as Published option (MQTT v5.0)
+ //
+ // If 1, Application Messages forwarded using this subscription keep the
+ // RETAIN flag they were published with.
+ // If 0, Application Messages forwarded using this subscription have the
+ // RETAIN flag set to 0.
+ // Retained messages sent when the subscription is established have the RETAIN flag set to 1.
+ uint32 rap = 4;
+
+ // The No Local option (MQTT v5.0)
+ //
+ // If the value is 1, Application Messages MUST NOT be forwarded to a
+ // connection with a ClientID equal to the ClientID of the publishing
+ uint32 nl = 5;
+}
diff --git a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/pom.xml b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/pom.xml
new file mode 100644
index 000000000..958aaf3fd
--- /dev/null
+++ b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/pom.xml
@@ -0,0 +1,111 @@
+
+
+
+
+
+ org.wso2.carbon.devicemgt-plugins
+ emqx-extensions
+ 6.0.13-SNAPSHOT
+ ../pom.xml
+
+
+ 4.0.0
+ io.entgra.device.mgt.plugins.emqx.initializer
+ bundle
+ EMQX Extensions - Extension Hook Initializer
+
+
+
+ org.wso2.carbon.devicemgt-plugins
+ io.entgra.device.mgt.plugins.emqx.exhook
+ jar-with-dependencies
+
+
+ org.wso2.carbon
+ org.wso2.carbon.core
+ provided
+
+
+ org.ops4j.pax.logging
+ pax-logging-api
+ provided
+
+
+
+
+
+ org.apache.maven.wagon
+ wagon-ssh
+ 2.1
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.2
+
+
+ ${wso2.maven.compiler.target}
+
+
+
+ org.apache.felix
+ maven-scr-plugin
+ 1.7.2
+
+
+ generate-scr-scrdescriptor
+
+ scr
+
+
+
+
+
+ org.apache.felix
+ maven-bundle-plugin
+ 1.4.0
+ true
+
+
+ ${project.artifactId}
+ ${project.artifactId}
+ ${project.version}
+ io.entgra.device.mgt.plugins.emqx.initializer
+ io.entgra.device.mgt.plugins.emqx.initializer.internal
+
+ io.entgra.device.mgt.plugins.emqx.exhook,
+ io.entgra.device.mgt.plugins.emqx.initializer;resolution:=optional,
+ org.apache.commons.logging,
+ org.wso2.carbon.core,
+ org.osgi.framework.*;version="${imp.package.version.osgi.framework}",
+ org.osgi.service.*;version="${imp.package.version.osgi.service}",
+
+
+ !io.entgra.device.mgt.plugins.emqx.initializer.internal,
+ io.entgra.device.mgt.plugins.emqx.initializer.*
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/EmqxExhookDataHolder.java b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/EmqxExhookDataHolder.java
new file mode 100644
index 000000000..9c3ab4e4f
--- /dev/null
+++ b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/EmqxExhookDataHolder.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2023, Entgra (Pvt) Ltd. (http://www.entgra.io) All Rights Reserved.
+ *
+ * Entgra (Pvt) Ltd. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.entgra.device.mgt.plugins.emqx.initializer;
+
+public class EmqxExhookDataHolder {
+
+ private static final EmqxExhookDataHolder thisInstance = new EmqxExhookDataHolder();
+ public static EmqxExhookDataHolder getInstance() {
+ return thisInstance;
+ }
+}
diff --git a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/EmqxExhookServiceComponent.java b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/EmqxExhookServiceComponent.java
new file mode 100644
index 000000000..cff5f0794
--- /dev/null
+++ b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/EmqxExhookServiceComponent.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2023, Entgra (Pvt) Ltd. (http://www.entgra.io) All Rights Reserved.
+ *
+ * Entgra (Pvt) Ltd. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.entgra.device.mgt.plugins.emqx.initializer;
+
+import io.entgra.device.mgt.plugins.emqx.initializer.internal.EmqxExhookInitializer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.component.ComponentContext;
+import org.wso2.carbon.core.ServerShutdownHandler;
+import org.wso2.carbon.core.ServerStartupObserver;
+
+/**
+ * @scr.component name="io.entgra.device.mgt.plugins.emqx.initializer.internal.EmqxExhookServiceComponent"
+ * immediate="true"
+ */
+public class EmqxExhookServiceComponent {
+ private static final Log log = LogFactory.getLog(EmqxExhookServiceComponent.class);
+
+ protected void activate(ComponentContext ctx) {
+ try {
+
+ EmqxExhookInitializer initializer = new EmqxExhookInitializer();
+ BundleContext bundleContext = ctx.getBundleContext();
+
+ bundleContext.registerService(ServerStartupObserver.class.getName(), initializer, null);
+ bundleContext.registerService(ServerShutdownHandler.class.getName(), initializer, null);
+
+ if (log.isDebugEnabled()) {
+ log.debug("EmqxExhookServiceComponent has been successfully activated");
+ }
+ } catch (Throwable e) {
+ log.error("Error occurred while activating EmqxExhookServiceComponent", e);
+ }
+
+ }
+
+ protected void deactivate(ComponentContext ctx) {
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("EmqxExhookServiceComponent has been successfully de-activated");
+ }
+ } catch (Throwable e) {
+ log.error("Error occurred while de-activating EmqxExhookServiceComponent", e);
+ }
+ }
+}
diff --git a/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/internal/EmqxExhookInitializer.java b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/internal/EmqxExhookInitializer.java
new file mode 100644
index 000000000..a6c7e5184
--- /dev/null
+++ b/components/extensions/emqx-extensions/io.entgra.device.mgt.plugins.emqx.initializer/src/main/java/io/entgra/device/mgt/plugins/emqx/initializer/internal/EmqxExhookInitializer.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2023, Entgra (Pvt) Ltd. (http://www.entgra.io) All Rights Reserved.
+ *
+ * Entgra (Pvt) Ltd. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.entgra.device.mgt.plugins.emqx.initializer.internal;
+
+
+import io.entgra.device.mgt.plugins.emqx.exhook.ExServer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.core.ServerShutdownHandler;
+import org.wso2.carbon.core.ServerStartupObserver;
+
+import java.io.IOException;
+
+public class EmqxExhookInitializer implements ServerShutdownHandler, ServerStartupObserver {
+ ExServer exServer = null;
+
+ private static final Log log = LogFactory.getLog(EmqxExhookInitializer.class);
+
+ @Override
+ public void completingServerStartup() {
+
+ }
+
+ @Override
+ public void completedServerStartup() {
+ log.info("completedServerStartup() ");
+ Runnable r = new Runnable() {
+ @Override
+ public void run() {
+ exServer = new ExServer();
+ try {
+ exServer.start();
+ exServer.blockUntilShutdown();
+ } catch (IOException e) {
+ log.error("Error while starting the EMQX extension server");
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ log.error("Error while blocking until shutdown");
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ new Thread(r).start();
+ }
+
+ @Override
+ public void invoke() {
+ try {
+ exServer.stop();
+ } catch (InterruptedException e) {
+ log.error("Error while stopping the EMQX Extension server");
+
+ }
+ }
+}
diff --git a/components/extensions/emqx-extensions/pom.xml b/components/extensions/emqx-extensions/pom.xml
new file mode 100644
index 000000000..35cb7afab
--- /dev/null
+++ b/components/extensions/emqx-extensions/pom.xml
@@ -0,0 +1,39 @@
+
+
+
+
+
+
+ org.wso2.carbon.devicemgt-plugins
+ extensions
+ 6.0.13-SNAPSHOT
+ ../pom.xml
+
+
+ 4.0.0
+ emqx-extensions
+ pom
+ EMQX Extensions
+
+
+ io.entgra.device.mgt.plugins.emqx.exhook
+ io.entgra.device.mgt.plugins.emqx.initializer
+
+
+
diff --git a/components/extensions/pom.xml b/components/extensions/pom.xml
index 86ccec15b..0dd38cbe8 100644
--- a/components/extensions/pom.xml
+++ b/components/extensions/pom.xml
@@ -36,6 +36,7 @@
cdmf-transport-adapters
pull-notification-listeners
+ emqx-extensions
diff --git a/pom.xml b/pom.xml
index b826ea75f..264626acb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1013,6 +1013,24 @@
org.wso2.carbon.andes.extensions.device.mgt.api
${carbon.devicemgt.plugins.version}
+
+
+
+ org.wso2.carbon.devicemgt-plugins
+ io.entgra.device.mgt.plugins.emqx.exhook
+ jar-with-dependencies
+ ${carbon.devicemgt.plugins.version}
+
+
+ org.wso2.carbon.devicemgt-plugins
+ io.entgra.device.mgt.plugins.emqx.initializer
+ ${carbon.devicemgt.plugins.version}
+
+
+ org.ops4j.pax.logging
+ pax-logging-api
+ ${pax.logging.api.version}
+
commons-lang
commons-lang
@@ -1117,6 +1135,35 @@
google-api-services-androidenterprise
v1-rev214-1.25.0
+
+
+
+ io.grpc
+ grpc-netty-shaded
+ ${grpc.version}
+
+
+ io.grpc
+ grpc-protobuf
+ ${grpc.version}
+
+
+ io.grpc
+ grpc-stub
+ ${grpc.version}
+
+
+ org.apache.tomcat
+ annotations-api
+ ${tomcat-annotations-api}
+ provided
+
+
+ commons-lang.wso2
+ commons-lang
+ ${commons-lang-wso2}
+ provided
+
@@ -1140,7 +1187,7 @@
4.6.2
[4.5.0, 5.0.0)
4.4.9
-
+ 1.11.2
5.1.2
@@ -1307,6 +1354,10 @@
[1.6.0, 2.0.0)
[1.2.0,1.3.0)
+
+ 1.51.0
+ 6.0.53
+ 2.6.0.wso2v1
@@ -1404,6 +1455,7 @@
+ org.apache.maven.plugins
maven-assembly-plugin
2.2-beta-2