emqx exhook and initializer

fix-test-failure
Amalka Subasinghe 2 years ago
parent 193a113a81
commit 11785fa4bb

@ -0,0 +1,130 @@
<?xml version="1.0" encoding="utf-8"?>
<!--
* Copyright (c) 2023, Entgra (Pvt) Ltd. (http://www.entgra.io) All Rights Reserved.
*
* Entgra (Pvt) Ltd. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<groupId>org.wso2.carbon.devicemgt-plugins</groupId>
<artifactId>emqx-extensions</artifactId>
<version>6.0.13-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>io.entgra.device.mgt.plugins.emqx.exhook</artifactId>
<packaging>jar</packaging>
<name>EMQX Extensions - Extension Hook</name>
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
</dependency>
<dependency> <!-- necessary for Java 9+ -->
<groupId>org.apache.tomcat</groupId>
<artifactId>annotations-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.wso2.carbon</groupId>
<artifactId>org.wso2.carbon.core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-lang.wso2</groupId>
<artifactId>commons-lang</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.wso2.carbon.devicemgt</groupId>
<artifactId>org.wso2.carbon.device.mgt.core</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.2</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.12.0:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.36.0:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>io.entgra.device.mgt.plugins.emqx.exhook.ExServer</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>assemble-all</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,477 @@
/*
* Copyright (c) 2023, Entgra (Pvt) Ltd. (http://www.entgra.io) All Rights Reserved.
*
* Entgra (Pvt) Ltd. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.entgra.device.mgt.plugins.emqx.exhook;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.protobuf.ByteString;
import com.google.protobuf.GeneratedMessageV3;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.wso2.carbon.device.mgt.core.config.DeviceConfigurationManager;
import org.wso2.carbon.device.mgt.core.config.keymanager.KeyManagerConfigurations;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;
public class ExServer {
private static final Log logger = LogFactory.getLog(ExServer.class.getName());
private static Map<String, String> accessTokenMap = new HashMap<>();
private static Map<String, String> authorizedScopeMap = new HashMap<>();
private Server server;
public ExServer() {
}
public void start() throws IOException {
/* The port on which the server should run */
int port = 9000;
server = ServerBuilder.forPort(port)
.addService(new HookProviderImpl())
.build()
.start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
try {
ExServer.this.stop();
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
System.err.println("*** server shut down");
}
});
}
public void stop() throws InterruptedException {
if (server != null) {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
}
}
/**
* Await termination on the main thread since the grpc library uses daemon threads.
*/
public void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
/**
* Main launches the server from the command line.
*/
public static void main(String[] args) throws IOException, InterruptedException {
final ExServer server = new ExServer();
server.start();
server.blockUntilShutdown();
}
static class HookProviderImpl extends HookProviderGrpc.HookProviderImplBase {
public void DEBUG(String fn, Object req) {
System.out.printf(fn + ", request: " + req);
}
@Override
public void onProviderLoaded(ProviderLoadedRequest request, StreamObserver<LoadedResponse> responseObserver) {
DEBUG("onProviderLoaded", request);
HookSpec[] specs = {
HookSpec.newBuilder().setName("client.connect").build(),
HookSpec.newBuilder().setName("client.connack").build(),
HookSpec.newBuilder().setName("client.connected").build(),
HookSpec.newBuilder().setName("client.disconnected").build(),
HookSpec.newBuilder().setName("client.authenticate").build(),
HookSpec.newBuilder().setName("client.check_acl").build(),
HookSpec.newBuilder().setName("client.subscribe").build(),
HookSpec.newBuilder().setName("client.unsubscribe").build(),
HookSpec.newBuilder().setName("session.created").build(),
HookSpec.newBuilder().setName("session.subscribed").build(),
HookSpec.newBuilder().setName("session.unsubscribed").build(),
HookSpec.newBuilder().setName("session.resumed").build(),
HookSpec.newBuilder().setName("session.discarded").build(),
HookSpec.newBuilder().setName("session.takeovered").build(),
HookSpec.newBuilder().setName("session.terminated").build(),
HookSpec.newBuilder().setName("message.publish").build(),
HookSpec.newBuilder().setName("message.delivered").build(),
HookSpec.newBuilder().setName("message.acked").build(),
HookSpec.newBuilder().setName("message.dropped").build()
};
LoadedResponse reply = LoadedResponse.newBuilder().addAllHooks(Arrays.asList(specs)).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void onProviderUnloaded(ProviderUnloadedRequest request, StreamObserver<EmptySuccess> responseObserver) {
DEBUG("onProviderUnloaded", request);
EmptySuccess reply = EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void onClientConnect(ClientConnectRequest request, StreamObserver<EmptySuccess> responseObserver) {
logger.info("onClientConnect -----------------------------");
DEBUG("onClientConnect", request);
EmptySuccess reply = EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void onClientConnack(ClientConnackRequest request, StreamObserver<EmptySuccess> responseObserver) {
DEBUG("onClientConnack", request);
EmptySuccess reply = EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void onClientConnected(ClientConnectedRequest request, StreamObserver<EmptySuccess> responseObserver) {
DEBUG("onClientConnected", request);
EmptySuccess reply = EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void onClientDisconnected(ClientDisconnectedRequest request, StreamObserver<EmptySuccess> responseObserver) {
logger.info("onClientDisconnected -----------------------------");
DEBUG("onClientDisconnected", request);
EmptySuccess reply = EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void onClientAuthenticate(ClientAuthenticateRequest request, StreamObserver<ValuedResponse> responseObserver) {
DEBUG("onClientAuthenticate", request);
if (!StringUtils.isEmpty(request.getClientinfo().getUsername()) &&
!StringUtils.isEmpty(request.getClientinfo().getPassword())) {
DEBUG("on access token passes", request);
try {
String accessToken = request.getClientinfo().getUsername() + "-" + request.getClientinfo().getPassword();
KeyManagerConfigurations keyManagerConfig = DeviceConfigurationManager.getInstance()
.getDeviceManagementConfig().getKeyManagerConfigurations();
HttpPost tokenEndpoint = new HttpPost(
keyManagerConfig.getServerUrl() + HandlerConstants.INTROSPECT_ENDPOINT);
tokenEndpoint.setHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_FORM_URLENCODED.toString());
tokenEndpoint.setHeader(HttpHeaders.AUTHORIZATION, HandlerConstants.BASIC + Base64.getEncoder()
.encodeToString((keyManagerConfig.getAdminUsername() + HandlerConstants.COLON
+ keyManagerConfig.getAdminPassword()).getBytes()));
StringEntity tokenEPPayload = new StringEntity("token=" + accessToken,
ContentType.APPLICATION_FORM_URLENCODED);
tokenEndpoint.setEntity(tokenEPPayload);
ProxyResponse tokenStatus = HandlerUtil.execute(tokenEndpoint);
if (tokenStatus.getExecutorResponse().contains(HandlerConstants.EXECUTOR_EXCEPTION_PREFIX)) {
if (tokenStatus.getCode() == HttpStatus.SC_UNAUTHORIZED) {
// return with error
logger.error("Unauthorized");
responseObserver.onError(new Exception("unauthorized"));
return;
} else {
// return with error
logger.error("error occurred while checking access token");
responseObserver.onError(new Exception("error occurred while checking access token"));
return;
}
}
String tokenData = tokenStatus.getData();
if (tokenData == null) {
// return with error
logger.error("invalid token data is received");
responseObserver.onError(new Exception("invalid token data is received"));
return;
}
JsonParser jsonParser = new JsonParser();
JsonElement jTokenResult = jsonParser.parse(tokenData);
if (jTokenResult.isJsonObject()) {
JsonObject jTokenResultAsJsonObject = jTokenResult.getAsJsonObject();
if (!jTokenResultAsJsonObject.get("active").getAsBoolean()) {
logger.error("access token is expired");
responseObserver.onError(new Exception("access token is expired"));
return;
}
// success
accessTokenMap.put(request.getClientinfo().getClientid(), accessToken);
authorizedScopeMap.put(accessToken, jTokenResultAsJsonObject.get("scope").getAsString());
logger.info("authenticated");
ValuedResponse reply = ValuedResponse.newBuilder()
.setBoolResult(true)
.setType(ValuedResponse.ResponsedType.STOP_AND_RETURN)
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
// } else {
// ValuedResponse reply = ValuedResponse.newBuilder()
// .setBoolResult(true)
// .setType(ValuedResponse.ResponsedType.STOP_AND_RETURN)
// .build();
// responseObserver.onNext(reply);
// responseObserver.onCompleted();
}
}
@Override
public void onClientCheckAcl(ClientCheckAclRequest request, StreamObserver<ValuedResponse> responseObserver) {
DEBUG("onClientCheckAcl", request);
/*
carbon.super/deviceType/deviceId
data/carbon.super/deviceType/deviceId
data/carbonsuper/deviceType/deviceId
republished/deviceType
*/
if (!StringUtils.isEmpty(request.getClientinfo().getUsername()) &&
StringUtils.isEmpty(request.getClientinfo().getPassword())) {
//todo: check token validity
String accessToken = accessTokenMap.get(request.getClientinfo().getClientid());
if (StringUtils.isEmpty(accessToken) || !accessToken.startsWith(request.getClientinfo().getUsername())) {
logger.info("Valid access token not found");
responseObserver.onError(new Exception("not authorized"));
}
String authorizedScopeList = authorizedScopeMap.get(accessToken);
String[] scopeArray = authorizedScopeList.split(" ");
List<String> scopeList = Arrays.asList(scopeArray);
boolean isFound = false;
String tempScope = null;
if (request.getType().equals(ClientCheckAclRequest.AclReqType.SUBSCRIBE) ||
request.getType().equals(ClientCheckAclRequest.AclReqType.PUBLISH)) {
String requestTopic = request.getTopic();
if (requestTopic.endsWith("/#")) {
requestTopic = requestTopic.substring(0, requestTopic.indexOf("/#"));
}
// replace / with :
requestTopic = requestTopic.replace("/", ":");
if (request.getType().equals(ClientCheckAclRequest.AclReqType.SUBSCRIBE)) {
tempScope = "perm:topic:sub:" + requestTopic;
}
if (request.getType().equals(ClientCheckAclRequest.AclReqType.PUBLISH)) {
tempScope = "perm:topic:pub:" + requestTopic;
}
if (scopeList.contains(tempScope)) {
isFound = true;
}
if (isFound) {
ValuedResponse reply = ValuedResponse.newBuilder()
.setBoolResult(true)
.setType(ValuedResponse.ResponsedType.STOP_AND_RETURN)
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} else {
responseObserver.onError(new Exception("not authorized"));
}
}
} else {
//default
ValuedResponse reply = ValuedResponse.newBuilder()
.setBoolResult(true)
.setType(ValuedResponse.ResponsedType.STOP_AND_RETURN)
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}
@Override
public void onClientSubscribe(ClientSubscribeRequest request, StreamObserver<EmptySuccess> responseObserver) {
DEBUG("onClientSubscribe", request);
EmptySuccess reply = EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void onClientUnsubscribe(ClientUnsubscribeRequest request, StreamObserver<EmptySuccess> responseObserver) {
DEBUG("onClientUnsubscribe", request);
EmptySuccess reply = EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void onSessionCreated(SessionCreatedRequest request, StreamObserver<EmptySuccess> responseObserver) {
DEBUG("onSessionCreated", request);
EmptySuccess reply = EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void onSessionSubscribed(SessionSubscribedRequest request, StreamObserver<EmptySuccess> responseObserver) {
DEBUG("onSessionSubscribed", request);
EmptySuccess reply = EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void onSessionUnsubscribed(SessionUnsubscribedRequest request, StreamObserver<EmptySuccess> responseObserver) {
DEBUG("onSessionUnsubscribed", request);
EmptySuccess reply = EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void onSessionResumed(SessionResumedRequest request, StreamObserver<EmptySuccess> responseObserver) {
DEBUG("onSessionResumed", request);
EmptySuccess reply = EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void onSessionDiscarded(SessionDiscardedRequest request, StreamObserver<EmptySuccess> responseObserver) {
DEBUG("onSessionDdiscarded", request);
EmptySuccess reply = EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void onSessionTakeovered(SessionTakeoveredRequest request, StreamObserver<EmptySuccess> responseObserver) {
DEBUG("onSessionTakeovered", request);
EmptySuccess reply = EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void onSessionTerminated(SessionTerminatedRequest request, StreamObserver<EmptySuccess> responseObserver) {
DEBUG("onSessionTerminated", request);
EmptySuccess reply = EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void onMessagePublish(MessagePublishRequest request, StreamObserver<ValuedResponse> responseObserver) {
DEBUG("onMessagePublish", request);
ByteString bstr = ByteString.copyFromUtf8("hardcode payload by exhook-svr-java :)");
Message nmsg = Message.newBuilder()
.setId (request.getMessage().getId())
.setNode (request.getMessage().getNode())
.setFrom (request.getMessage().getFrom())
.setTopic (request.getMessage().getTopic())
.setPayload(((GeneratedMessageV3) request).toByteString()).build();
ValuedResponse reply = ValuedResponse.newBuilder()
.setType(ValuedResponse.ResponsedType.STOP_AND_RETURN)
.setMessage(nmsg).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
// case2: stop publish the 't/d' messages
// @Override
// public void onMessagePublish(MessagePublishRequest request, StreamObserver<ValuedResponse> responseObserver) {
// DEBUG("onMessagePublish", request);
//
// Message nmsg = request.getMessage();
// if ("t/d".equals(nmsg.getTopic())) {
// ByteString bstr = ByteString.copyFromUtf8("");
// nmsg = Message.newBuilder()
// .setId (request.getMessage().getId())
// .setNode (request.getMessage().getNode())
// .setFrom (request.getMessage().getFrom())
// .setTopic (request.getMessage().getTopic())
// .setPayload(bstr)
// .putHeaders("allow_publish", "false").build();
// }
//
// ValuedResponse reply = ValuedResponse.newBuilder()
// .setType(ValuedResponse.ResponsedType.STOP_AND_RETURN)
// .setMessage(nmsg).build();
// responseObserver.onNext(reply);
// responseObserver.onCompleted();
// }
@Override
public void onMessageDelivered(MessageDeliveredRequest request, StreamObserver<EmptySuccess> responseObserver) {
DEBUG("onMessageDelivered", request);
EmptySuccess reply = EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void onMessageAcked(MessageAckedRequest request, StreamObserver<EmptySuccess> responseObserver) {
DEBUG("onMessageAcked", request);
EmptySuccess reply = EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void onMessageDropped(MessageDroppedRequest request, StreamObserver<EmptySuccess> responseObserver) {
DEBUG("onMessageDropped", request);
EmptySuccess reply = EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}
}

@ -0,0 +1,28 @@
/*
* Copyright (c) 2023, Entgra (Pvt) Ltd. (http://www.entgra.io) All Rights Reserved.
*
* Entgra (Pvt) Ltd. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.entgra.device.mgt.plugins.emqx.exhook;
public class HandlerConstants {
public static final String INTROSPECT_ENDPOINT = "/oauth2/introspect";
public static final String BASIC = "Basic ";
public static final String EXECUTOR_EXCEPTION_PREFIX = "ExecutorException-";
public static final String TOKEN_IS_EXPIRED = "ACCESS_TOKEN_IS_EXPIRED";
public static final String COLON = ":";
public static final int INTERNAL_ERROR_CODE = 500;
}

@ -0,0 +1,280 @@
/*
* Copyright (c) 2023, Entgra (Pvt) Ltd. (http://www.entgra.io) All Rights Reserved.
*
* Entgra (Pvt) Ltd. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.entgra.device.mgt.plugins.emqx.exhook;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.io.*;
import java.security.*;
import java.security.cert.CertificateException;
public class HandlerUtil {
private static KeyStore keyStore;
private static KeyStore trustStore;
private static char[] keyStorePassword;
private static SSLContext sslContext;
private static final String KEY_STORE_TYPE = "JKS";
/**
* Default truststore type of the client
*/
private static final String TRUST_STORE_TYPE = "JKS";
private static final String KEY_MANAGER_TYPE = "SunX509"; //Default Key Manager Type
/**
* Default trustmanager type of the client
*/
private static final String TRUST_MANAGER_TYPE = "SunX509"; //Default Trust Manager Type
private static final String SSLV3 = "SSLv3";
private static final Log log = LogFactory.getLog(HandlerUtil.class);
/***
*
* @param httpRequest - httpMethod e.g:- HttpPost, HttpGet
* @return response as string
* @throws IOException IO exception returns if error occurs when executing the httpMethod
*/
public static ProxyResponse execute(HttpRequestBase httpRequest) throws IOException {
try (CloseableHttpClient client = getHttpClient()) {
HttpResponse response = client.execute(httpRequest);
ProxyResponse proxyResponse = new ProxyResponse();
if (response == null) {
log.error("Received null response for http request : " + httpRequest.getMethod() + " " + httpRequest
.getURI().toString());
proxyResponse.setCode(HandlerConstants.INTERNAL_ERROR_CODE);
proxyResponse.setStatus(ProxyResponse.Status.ERROR);
proxyResponse.setExecutorResponse(HandlerConstants.EXECUTOR_EXCEPTION_PREFIX + getStatusKey(
HandlerConstants.INTERNAL_ERROR_CODE));
return proxyResponse;
} else {
int statusCode = response.getStatusLine().getStatusCode();
String jsonString = getResponseString(response);
if (statusCode == HttpStatus.SC_OK || statusCode == HttpStatus.SC_CREATED) {
proxyResponse.setCode(statusCode);
proxyResponse.setData(jsonString);
proxyResponse.setStatus(ProxyResponse.Status.SUCCESS);
proxyResponse.setExecutorResponse("SUCCESS");
proxyResponse.setHeaders(response.getAllHeaders());
return proxyResponse;
} else if (statusCode == HttpStatus.SC_UNAUTHORIZED) {
if (isTokenExpired(jsonString)) {
proxyResponse.setCode(statusCode);
proxyResponse.setStatus(ProxyResponse.Status.ERROR);
proxyResponse.setExecutorResponse(HandlerConstants.TOKEN_IS_EXPIRED);
} else {
log.error(
"Received " + statusCode + " response for http request : " + httpRequest.getMethod()
+ " " + httpRequest.getURI().toString() + ". Error message: " + jsonString);
proxyResponse.setCode(statusCode);
proxyResponse.setData(jsonString);
proxyResponse.setStatus(ProxyResponse.Status.ERROR);
proxyResponse.setExecutorResponse(
HandlerConstants.EXECUTOR_EXCEPTION_PREFIX + getStatusKey(statusCode));
}
return proxyResponse;
}
log.error("Received " + statusCode +
" response for http request : " + httpRequest.getMethod() + " " + httpRequest.getURI()
.toString() + ". Error message: " + jsonString);
proxyResponse.setCode(statusCode);
proxyResponse.setData(jsonString);
proxyResponse.setStatus(ProxyResponse.Status.ERROR);
proxyResponse
.setExecutorResponse(HandlerConstants.EXECUTOR_EXCEPTION_PREFIX + getStatusKey(statusCode));
return proxyResponse;
}
}
}
public static boolean isTokenExpired(String jsonBody) {
return jsonBody.contains("Access token expired") || jsonBody
.contains("Invalid input. Access token validation failed");
}
/***
*
* @param statusCode Provide status code, e.g:- 400, 401, 500 etc
* @return relative status code key for given status code.
*/
public static String getStatusKey(int statusCode) {
String statusCodeKey;
switch (statusCode) {
case HttpStatus.SC_INTERNAL_SERVER_ERROR:
statusCodeKey = "internalServerError";
break;
case HttpStatus.SC_BAD_REQUEST:
statusCodeKey = "badRequest";
break;
case HttpStatus.SC_UNAUTHORIZED:
statusCodeKey = "unauthorized";
break;
case HttpStatus.SC_FORBIDDEN:
statusCodeKey = "forbidden";
break;
case HttpStatus.SC_NOT_FOUND:
statusCodeKey = "notFound";
break;
case HttpStatus.SC_METHOD_NOT_ALLOWED:
statusCodeKey = "methodNotAllowed";
break;
case HttpStatus.SC_NOT_ACCEPTABLE:
statusCodeKey = "notAcceptable";
break;
case HttpStatus.SC_UNSUPPORTED_MEDIA_TYPE:
statusCodeKey = "unsupportedMediaType";
break;
default:
statusCodeKey = "defaultPage";
break;
}
return statusCodeKey;
}
/**
* Retrieve Http client based on hostname verification.
*
* @return {@link CloseableHttpClient} http client
*/
public static CloseableHttpClient getHttpClient() {
boolean isIgnoreHostnameVerification = Boolean.parseBoolean(System.
getProperty("org.wso2.ignoreHostnameVerification"));
if (isIgnoreHostnameVerification) {
return HttpClients.custom().setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE).build();
} else {
return HttpClients.createDefault();
}
// String keyStorePassword = "wso2carbon";
// String trustStorePassword = "wso2carbon";
// String keyStoreLocation = "/home/amalka/entgra/source/product-switchgear/switchgear-plugin/integration-test/distro-prep/target/entgra-iot-pro-switchgear-repacked-1.0.0-SNAPSHOT/repository/resources/security/wso2carbon.jks";
// String trustStoreLocation = "/home/amalka/entgra/source/product-switchgear/switchgear-plugin/integration-test/distro-prep/target/entgra-iot-pro-switchgear-repacked-1.0.0-SNAPSHOT/repository/resources/security/client-truststore.jks";
//
// //Call to load the keystore.
// try {
// loadKeyStore(keyStoreLocation, keyStorePassword);
// } catch (IOException e) {
// throw new RuntimeException(e);
// } catch (CertificateException e) {
// throw new RuntimeException(e);
// } catch (NoSuchAlgorithmException e) {
// throw new RuntimeException(e);
// } catch (KeyStoreException e) {
// throw new RuntimeException(e);
// }
// //Call to load the TrustStore.
// try {
// loadTrustStore(trustStoreLocation, trustStorePassword);
// } catch (KeyStoreException e) {
// throw new RuntimeException(e);
// } catch (IOException e) {
// throw new RuntimeException(e);
// } catch (CertificateException e) {
// throw new RuntimeException(e);
// } catch (NoSuchAlgorithmException e) {
// throw new RuntimeException(e);
// }
// //Create the SSL context with the loaded TrustStore/keystore.
// try {
// initSSLConnection();
// } catch (NoSuchAlgorithmException e) {
// throw new RuntimeException(e);
// } catch (UnrecoverableKeyException e) {
// throw new RuntimeException(e);
// } catch (KeyStoreException e) {
// throw new RuntimeException(e);
// } catch (KeyManagementException e) {
// throw new RuntimeException(e);
// }
//
// return HttpClients.createDefault();
}
private static void loadKeyStore(String keyStorePath, String ksPassword)
throws IOException, CertificateException, NoSuchAlgorithmException, KeyStoreException {
InputStream fis = null;
try {
keyStorePassword = ksPassword.toCharArray();
keyStore = KeyStore.getInstance(KEY_STORE_TYPE);
fis = new FileInputStream(keyStorePath);
keyStore.load(fis, keyStorePassword);
} finally {
if (fis != null) {
fis.close();
}
}
}
private static void loadTrustStore(String trustStorePath, String tsPassword)
throws KeyStoreException, IOException, CertificateException, NoSuchAlgorithmException {
InputStream fis = null;
try {
trustStore = KeyStore.getInstance(TRUST_STORE_TYPE);
fis = new FileInputStream(trustStorePath);
trustStore.load(fis, tsPassword.toCharArray());
} finally {
if (fis != null) {
fis.close();
}
}
}
private static void initSSLConnection() throws NoSuchAlgorithmException, UnrecoverableKeyException,
KeyStoreException, KeyManagementException {
KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KEY_MANAGER_TYPE);
keyManagerFactory.init(keyStore, keyStorePassword);
TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TRUST_MANAGER_TYPE);
trustManagerFactory.init(trustStore);
// Create and initialize SSLContext for HTTPS communication
sslContext = SSLContext.getInstance(SSLV3);
sslContext.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), null);
SSLContext.setDefault(sslContext);
}
public static String getResponseString(HttpResponse response) throws IOException {
try (BufferedReader rd = new BufferedReader(new InputStreamReader(response.getEntity().getContent()))) {
StringBuilder responseBuilder = new StringBuilder();
String line;
while ((line = rd.readLine()) != null) {
responseBuilder.append(line);
}
return responseBuilder.toString();
}
}
}

@ -0,0 +1,63 @@
/*
* Copyright (c) 2023, Entgra (Pvt) Ltd. (http://www.entgra.io) All Rights Reserved.
*
* Entgra (Pvt) Ltd. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.entgra.device.mgt.plugins.emqx.exhook;
import org.apache.http.Header;
public class ProxyResponse {
public static class Status {
public static int SUCCESS = 1;
public static int ERROR = 0;
}
private int code;
private String data;
private String executorResponse;
private int status;
private Header[] headers;
public int getCode() { return code; }
public void setCode(int code) { this.code = code; }
public String getData() { return data; }
public void setData(String data) { this.data = data; }
public String getExecutorResponse() { return executorResponse; }
public void setExecutorResponse(String executorResponse) { this.executorResponse = executorResponse; }
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public Header[] getHeaders() {
return headers;
}
public void setHeaders(Header[] headers) {
this.headers = headers;
}
}

@ -0,0 +1,434 @@
//
// Copyright (c) 2023, Entgra (Pvt) Ltd. (http://www.entgra.io) All Rights Reserved.
//
// Entgra (Pvt) Ltd. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
syntax = "proto3";
option csharp_namespace = "Emqx.Exhook.V1";
option go_package = "emqx.io/grpc/exhook";
option java_multiple_files = true;
option java_package = "io.entgra.device.mgt.plugins.emqx.exhook";
option java_outer_classname = "EmqxExHookProto";
package emqx.exhook.v1;
service HookProvider {
rpc OnProviderLoaded(ProviderLoadedRequest) returns (LoadedResponse) {};
rpc OnProviderUnloaded(ProviderUnloadedRequest) returns (EmptySuccess) {};
rpc OnClientConnect(ClientConnectRequest) returns (EmptySuccess) {};
rpc OnClientConnack(ClientConnackRequest) returns (EmptySuccess) {};
rpc OnClientConnected(ClientConnectedRequest) returns (EmptySuccess) {};
rpc OnClientDisconnected(ClientDisconnectedRequest) returns (EmptySuccess) {};
rpc OnClientAuthenticate(ClientAuthenticateRequest) returns (ValuedResponse) {};
rpc OnClientCheckAcl(ClientCheckAclRequest) returns (ValuedResponse) {};
rpc OnClientSubscribe(ClientSubscribeRequest) returns (EmptySuccess) {};
rpc OnClientUnsubscribe(ClientUnsubscribeRequest) returns (EmptySuccess) {};
rpc OnSessionCreated(SessionCreatedRequest) returns (EmptySuccess) {};
rpc OnSessionSubscribed(SessionSubscribedRequest) returns (EmptySuccess) {};
rpc OnSessionUnsubscribed(SessionUnsubscribedRequest) returns (EmptySuccess) {};
rpc OnSessionResumed(SessionResumedRequest) returns (EmptySuccess) {};
rpc OnSessionDiscarded(SessionDiscardedRequest) returns (EmptySuccess) {};
rpc OnSessionTakeovered(SessionTakeoveredRequest) returns (EmptySuccess) {};
rpc OnSessionTerminated(SessionTerminatedRequest) returns (EmptySuccess) {};
rpc OnMessagePublish(MessagePublishRequest) returns (ValuedResponse) {};
rpc OnMessageDelivered(MessageDeliveredRequest) returns (EmptySuccess) {};
rpc OnMessageDropped(MessageDroppedRequest) returns (EmptySuccess) {};
rpc OnMessageAcked(MessageAckedRequest) returns (EmptySuccess) {};
}
//------------------------------------------------------------------------------
// Request & Response
//------------------------------------------------------------------------------
message ProviderLoadedRequest {
BrokerInfo broker = 1;
}
message LoadedResponse {
repeated HookSpec hooks = 1;
}
message ProviderUnloadedRequest { }
message ClientConnectRequest {
ConnInfo conninfo = 1;
// MQTT CONNECT packet's properties (MQTT v5.0)
//
// It should be empty on MQTT v3.1.1/v3.1 or others protocol
repeated Property props = 2;
}
message ClientConnackRequest {
ConnInfo conninfo = 1;
string result_code = 2;
repeated Property props = 3;
}
message ClientConnectedRequest {
ClientInfo clientinfo = 1;
}
message ClientDisconnectedRequest {
ClientInfo clientinfo = 1;
string reason = 2;
}
message ClientAuthenticateRequest {
ClientInfo clientinfo = 1;
bool result = 2;
}
message ClientCheckAclRequest {
ClientInfo clientinfo = 1;
enum AclReqType {
PUBLISH = 0;
SUBSCRIBE = 1;
}
AclReqType type = 2;
string topic = 3;
bool result = 4;
}
message ClientSubscribeRequest {
ClientInfo clientinfo = 1;
repeated Property props = 2;
repeated TopicFilter topic_filters = 3;
}
message ClientUnsubscribeRequest {
ClientInfo clientinfo = 1;
repeated Property props = 2;
repeated TopicFilter topic_filters = 3;
}
message SessionCreatedRequest {
ClientInfo clientinfo = 1;
}
message SessionSubscribedRequest {
ClientInfo clientinfo = 1;
string topic = 2;
SubOpts subopts = 3;
}
message SessionUnsubscribedRequest {
ClientInfo clientinfo = 1;
string topic = 2;
}
message SessionResumedRequest {
ClientInfo clientinfo = 1;
}
message SessionDiscardedRequest {
ClientInfo clientinfo = 1;
}
message SessionTakeoveredRequest {
ClientInfo clientinfo = 1;
}
message SessionTerminatedRequest {
ClientInfo clientinfo = 1;
string reason = 2;
}
message MessagePublishRequest {
Message message = 1;
}
message MessageDeliveredRequest {
ClientInfo clientinfo = 1;
Message message = 2;
}
message MessageDroppedRequest {
Message message = 1;
string reason = 2;
}
message MessageAckedRequest {
ClientInfo clientinfo = 1;
Message message = 2;
}
//------------------------------------------------------------------------------
// Basic data types
//------------------------------------------------------------------------------
message EmptySuccess { }
message ValuedResponse {
// The responsed value type
// - contiune: Use the responsed value and execute the next hook
// - ignore: Ignore the responsed value
// - stop_and_return: Use the responsed value and stop the chain executing
enum ResponsedType {
CONTINUE = 0;
IGNORE = 1;
STOP_AND_RETURN = 2;
}
ResponsedType type = 1;
oneof value {
// Boolean result, used on the 'client.authenticate', 'client.check_acl' hooks
bool bool_result = 3;
// Message result, used on the 'message.*' hooks
Message message = 4;
}
}
message BrokerInfo {
string version = 1;
string sysdescr = 2;
string uptime = 3;
string datetime = 4;
}
message HookSpec {
// The registered hooks name
//
// Available value:
// "client.connect", "client.connack"
// "client.connected", "client.disconnected"
// "client.authenticate", "client.check_acl"
// "client.subscribe", "client.unsubscribe"
//
// "session.created", "session.subscribed"
// "session.unsubscribed", "session.resumed"
// "session.discarded", "session.takeovered"
// "session.terminated"
//
// "message.publish", "message.delivered"
// "message.acked", "message.dropped"
string name = 1;
// The topic filters for message hooks
repeated string topics = 2;
}
message ConnInfo {
string node = 1;
string clientid = 2;
string username = 3;
string peerhost = 4;
uint32 sockport = 5;
string proto_name = 6;
string proto_ver = 7;
uint32 keepalive = 8;
}
message ClientInfo {
string node = 1;
string clientid = 2;
string username = 3;
string password = 4;
string peerhost = 5;
uint32 sockport = 6;
string protocol = 7;
string mountpoint = 8;
bool is_superuser = 9;
bool anonymous = 10;
// common name of client TLS cert
string cn = 11;
// subject of client TLS cert
string dn = 12;
}
message Message {
string node = 1;
string id = 2;
uint32 qos = 3;
string from = 4;
string topic = 5;
bytes payload = 6;
uint64 timestamp = 7;
// The key of header can be:
// - username:
// * Readonly
// * The username of sender client
// * Value type: utf8 string
// - protocol:
// * Readonly
// * The protocol name of sender client
// * Value type: string enum with "mqtt", "mqtt-sn", ...
// - peerhost:
// * Readonly
// * The peerhost of sender client
// * Value type: ip address string
// - allow_publish:
// * Writable
// * Whether to allow the message to be published by emqx
// * Value type: string enum with "true", "false", default is "true"
//
// Notes: All header may be missing, which means that the message does not
// carry these headers. We can guarantee that clients coming from MQTT,
// MQTT-SN, CoAP, LwM2M and other natively supported protocol clients will
// carry these headers, but there is no guarantee that messages published
// by other means will do, e.g. messages published by HTTP-API
map<string, string> headers = 8;
}
message Property {
string name = 1;
string value = 2;
}
message TopicFilter {
string name = 1;
uint32 qos = 2;
}
message SubOpts {
// The QoS level
uint32 qos = 1;
// The group name for shared subscription
string share = 2;
// The Retain Handling option (MQTT v5.0)
//
// 0 = Send retained messages at the time of the subscribe
// 1 = Send retained messages at subscribe only if the subscription does
// not currently exist
// 2 = Do not send retained messages at the time of the subscribe
uint32 rh = 3;
// The Retain as Published option (MQTT v5.0)
//
// If 1, Application Messages forwarded using this subscription keep the
// RETAIN flag they were published with.
// If 0, Application Messages forwarded using this subscription have the
// RETAIN flag set to 0.
// Retained messages sent when the subscription is established have the RETAIN flag set to 1.
uint32 rap = 4;
// The No Local option (MQTT v5.0)
//
// If the value is 1, Application Messages MUST NOT be forwarded to a
// connection with a ClientID equal to the ClientID of the publishing
uint32 nl = 5;
}

@ -0,0 +1,111 @@
<?xml version="1.0" encoding="utf-8"?>
<!--
* Copyright (c) 2023, Entgra (Pvt) Ltd. (http://www.entgra.io) All Rights Reserved.
*
* Entgra (Pvt) Ltd. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<groupId>org.wso2.carbon.devicemgt-plugins</groupId>
<artifactId>emqx-extensions</artifactId>
<version>6.0.13-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>io.entgra.device.mgt.plugins.emqx.initializer</artifactId>
<packaging>bundle</packaging>
<name>EMQX Extensions - Extension Hook Initializer</name>
<dependencies>
<dependency>
<groupId>org.wso2.carbon.devicemgt-plugins</groupId>
<artifactId>io.entgra.device.mgt.plugins.emqx.exhook</artifactId>
<classifier>jar-with-dependencies</classifier>
</dependency>
<dependency>
<groupId>org.wso2.carbon</groupId>
<artifactId>org.wso2.carbon.core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.ops4j.pax.logging</groupId>
<artifactId>pax-logging-api</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>org.apache.maven.wagon</groupId>
<artifactId>wagon-ssh</artifactId>
<version>2.1</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>${wso2.maven.compiler.source}</source>
<target>${wso2.maven.compiler.target}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-scr-plugin</artifactId>
<version>1.7.2</version>
<executions>
<execution>
<id>generate-scr-scrdescriptor</id>
<goals>
<goal>scr</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<version>1.4.0</version>
<extensions>true</extensions>
<configuration>
<instructions>
<Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
<Bundle-Name>${project.artifactId}</Bundle-Name>
<Bundle-Version>${project.version}</Bundle-Version>
<Bundle-Description>io.entgra.device.mgt.plugins.emqx.initializer</Bundle-Description>
<Private-Package>io.entgra.device.mgt.plugins.emqx.initializer.internal</Private-Package>
<Import-Package>
io.entgra.device.mgt.plugins.emqx.exhook,
io.entgra.device.mgt.plugins.emqx.initializer;resolution:=optional,
org.apache.commons.logging,
org.wso2.carbon.core,
org.osgi.framework.*;version="${imp.package.version.osgi.framework}",
org.osgi.service.*;version="${imp.package.version.osgi.service}",
</Import-Package>
<Export-Package>
!io.entgra.device.mgt.plugins.emqx.initializer.internal,
io.entgra.device.mgt.plugins.emqx.initializer.*
</Export-Package>
</instructions>
</configuration>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,26 @@
/*
* Copyright (c) 2023, Entgra (Pvt) Ltd. (http://www.entgra.io) All Rights Reserved.
*
* Entgra (Pvt) Ltd. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.entgra.device.mgt.plugins.emqx.initializer;
public class EmqxExhookDataHolder {
private static final EmqxExhookDataHolder thisInstance = new EmqxExhookDataHolder();
public static EmqxExhookDataHolder getInstance() {
return thisInstance;
}
}

@ -0,0 +1,62 @@
/*
* Copyright (c) 2023, Entgra (Pvt) Ltd. (http://www.entgra.io) All Rights Reserved.
*
* Entgra (Pvt) Ltd. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.entgra.device.mgt.plugins.emqx.initializer;
import io.entgra.device.mgt.plugins.emqx.initializer.internal.EmqxExhookInitializer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.osgi.framework.BundleContext;
import org.osgi.service.component.ComponentContext;
import org.wso2.carbon.core.ServerShutdownHandler;
import org.wso2.carbon.core.ServerStartupObserver;
/**
* @scr.component name="io.entgra.device.mgt.plugins.emqx.initializer.internal.EmqxExhookServiceComponent"
* immediate="true"
*/
public class EmqxExhookServiceComponent {
private static final Log log = LogFactory.getLog(EmqxExhookServiceComponent.class);
protected void activate(ComponentContext ctx) {
try {
EmqxExhookInitializer initializer = new EmqxExhookInitializer();
BundleContext bundleContext = ctx.getBundleContext();
bundleContext.registerService(ServerStartupObserver.class.getName(), initializer, null);
bundleContext.registerService(ServerShutdownHandler.class.getName(), initializer, null);
if (log.isDebugEnabled()) {
log.debug("EmqxExhookServiceComponent has been successfully activated");
}
} catch (Throwable e) {
log.error("Error occurred while activating EmqxExhookServiceComponent", e);
}
}
protected void deactivate(ComponentContext ctx) {
try {
if (log.isDebugEnabled()) {
log.debug("EmqxExhookServiceComponent has been successfully de-activated");
}
} catch (Throwable e) {
log.error("Error occurred while de-activating EmqxExhookServiceComponent", e);
}
}
}

@ -0,0 +1,71 @@
/*
* Copyright (c) 2023, Entgra (Pvt) Ltd. (http://www.entgra.io) All Rights Reserved.
*
* Entgra (Pvt) Ltd. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.entgra.device.mgt.plugins.emqx.initializer.internal;
import io.entgra.device.mgt.plugins.emqx.exhook.ExServer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.core.ServerShutdownHandler;
import org.wso2.carbon.core.ServerStartupObserver;
import java.io.IOException;
public class EmqxExhookInitializer implements ServerShutdownHandler, ServerStartupObserver {
ExServer exServer = null;
private static final Log log = LogFactory.getLog(EmqxExhookInitializer.class);
@Override
public void completingServerStartup() {
}
@Override
public void completedServerStartup() {
log.info("completedServerStartup() ");
Runnable r = new Runnable() {
@Override
public void run() {
exServer = new ExServer();
try {
exServer.start();
exServer.blockUntilShutdown();
} catch (IOException e) {
log.error("Error while starting the EMQX extension server");
throw new RuntimeException(e);
} catch (InterruptedException e) {
log.error("Error while blocking until shutdown");
throw new RuntimeException(e);
}
}
};
new Thread(r).start();
}
@Override
public void invoke() {
try {
exServer.stop();
} catch (InterruptedException e) {
log.error("Error while stopping the EMQX Extension server");
}
}
}

@ -0,0 +1,39 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
* Copyright (c) 2023, Entgra (Pvt) Ltd. (http://www.entgra.io) All Rights Reserved.
*
* Entgra (Pvt) Ltd. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.wso2.carbon.devicemgt-plugins</groupId>
<artifactId>extensions</artifactId>
<version>6.0.13-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>emqx-extensions</artifactId>
<packaging>pom</packaging>
<name>EMQX Extensions</name>
<modules>
<module>io.entgra.device.mgt.plugins.emqx.exhook</module>
<module>io.entgra.device.mgt.plugins.emqx.initializer</module>
</modules>
</project>

@ -36,6 +36,7 @@
<module>cdmf-transport-adapters</module>
<!--<module>mb-extensions</module>-->
<module>pull-notification-listeners</module>
<module>emqx-extensions</module>
</modules>
<build>

@ -1013,6 +1013,24 @@
<artifactId>org.wso2.carbon.andes.extensions.device.mgt.api</artifactId>
<version>${carbon.devicemgt.plugins.version}</version>
</dependency>
<!--EMQX Extensions -->
<dependency>
<groupId>org.wso2.carbon.devicemgt-plugins</groupId>
<artifactId>io.entgra.device.mgt.plugins.emqx.exhook</artifactId>
<classifier>jar-with-dependencies</classifier>
<version>${carbon.devicemgt.plugins.version}</version>
</dependency>
<dependency>
<groupId>org.wso2.carbon.devicemgt-plugins</groupId>
<artifactId>io.entgra.device.mgt.plugins.emqx.initializer</artifactId>
<version>${carbon.devicemgt.plugins.version}</version>
</dependency>
<dependency>
<groupId>org.ops4j.pax.logging</groupId>
<artifactId>pax-logging-api</artifactId>
<version>${pax.logging.api.version}</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
@ -1117,6 +1135,35 @@
<artifactId>google-api-services-androidenterprise</artifactId>
<version>v1-rev214-1.25.0</version>
</dependency>
<!--Dependencies for EMQX-->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency> <!-- necessary for Java 9+ -->
<groupId>org.apache.tomcat</groupId>
<artifactId>annotations-api</artifactId>
<version>${tomcat-annotations-api}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-lang.wso2</groupId>
<artifactId>commons-lang</artifactId>
<version>${commons-lang-wso2}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</dependencyManagement>
@ -1140,7 +1187,7 @@
<carbon.kernel.version>4.6.2</carbon.kernel.version>
<carbon.kernel.version.range>[4.5.0, 5.0.0)</carbon.kernel.version.range>
<carbon.logging.version>4.4.9</carbon.logging.version>
<pax.logging.api.version>1.11.2</pax.logging.api.version>
<carbon.p2.plugin.version>5.1.2</carbon.p2.plugin.version>
<!-- Axis2 -->
@ -1307,6 +1354,10 @@
<imp.package.version.osgi.framework>[1.6.0, 2.0.0)</imp.package.version.osgi.framework>
<imp.package.version.osgi.service>[1.2.0,1.3.0)</imp.package.version.osgi.service>
<grpc.version>1.51.0</grpc.version>
<tomcat-annotations-api>6.0.53</tomcat-annotations-api>
<commons-lang-wso2>2.6.0.wso2v1</commons-lang-wso2>
</properties>
<scm>
@ -1404,6 +1455,7 @@
</executions>
</plugin-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2-beta-2</version>
</plugin>

Loading…
Cancel
Save