|
|
|
@ -1,12 +1,30 @@
|
|
|
|
|
package org.wso2.carbon.device.mgt.jaxrs.service.impl;
|
|
|
|
|
|
|
|
|
|
import edu.emory.mathcs.backport.java.util.Arrays;
|
|
|
|
|
import org.apache.axis2.AxisFault;
|
|
|
|
|
import org.apache.axis2.client.Stub;
|
|
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
|
|
import org.apache.http.HttpResponse;
|
|
|
|
|
import org.apache.http.client.methods.HttpPost;
|
|
|
|
|
import org.apache.http.entity.ContentType;
|
|
|
|
|
import org.apache.http.entity.StringEntity;
|
|
|
|
|
import org.apache.velocity.util.ArrayListWrapper;
|
|
|
|
|
import org.json.JSONObject;
|
|
|
|
|
import org.opensaml.xml.signature.J;
|
|
|
|
|
import org.wso2.carbon.analytics.stream.persistence.stub.EventStreamPersistenceAdminService;
|
|
|
|
|
import org.wso2.carbon.analytics.stream.persistence.stub.EventStreamPersistenceAdminServiceEventStreamPersistenceAdminServiceExceptionException;
|
|
|
|
|
import org.wso2.carbon.analytics.stream.persistence.stub.EventStreamPersistenceAdminServiceStub;
|
|
|
|
|
import org.wso2.carbon.analytics.stream.persistence.stub.dto.AnalyticsTable;
|
|
|
|
|
import org.wso2.carbon.analytics.stream.persistence.stub.dto.AnalyticsTableRecord;
|
|
|
|
|
import org.wso2.carbon.base.MultitenantConstants;
|
|
|
|
|
import org.wso2.carbon.context.PrivilegedCarbonContext;
|
|
|
|
|
import org.wso2.carbon.databridge.commons.StreamDefinition;
|
|
|
|
|
import org.wso2.carbon.databridge.commons.exception.MalformedStreamDefinitionException;
|
|
|
|
|
import org.wso2.carbon.device.mgt.common.exceptions.DeviceManagementException;
|
|
|
|
|
import org.wso2.carbon.device.mgt.core.dto.DeviceType;
|
|
|
|
|
import org.wso2.carbon.device.mgt.core.service.DeviceManagementProviderService;
|
|
|
|
|
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.Attribute;
|
|
|
|
|
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.AttributeType;
|
|
|
|
|
import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.DeviceTypeEvent;
|
|
|
|
@ -15,26 +33,50 @@ import org.wso2.carbon.device.mgt.jaxrs.beans.analytics.TransportType;
|
|
|
|
|
import org.wso2.carbon.device.mgt.jaxrs.service.api.DeviceEventManagementService;
|
|
|
|
|
import org.wso2.carbon.device.mgt.jaxrs.util.Constants;
|
|
|
|
|
import org.wso2.carbon.device.mgt.jaxrs.util.DeviceMgtAPIUtils;
|
|
|
|
|
import org.wso2.carbon.event.input.adapter.core.InputEventAdapterConfiguration;
|
|
|
|
|
import org.wso2.carbon.event.input.adapter.core.MessageType;
|
|
|
|
|
import org.wso2.carbon.event.output.adapter.core.OutputEventAdapterConfiguration;
|
|
|
|
|
import org.wso2.carbon.event.output.adapter.core.OutputEventAdapterService;
|
|
|
|
|
import org.wso2.carbon.event.output.adapter.rdbms.RDBMSEventAdapter;
|
|
|
|
|
import org.wso2.carbon.event.output.adapter.rdbms.internal.ds.RDBMSEventAdapterServiceDS;
|
|
|
|
|
import org.wso2.carbon.event.processor.manager.core.EventProcessorManagementService;
|
|
|
|
|
import org.wso2.carbon.event.processor.manager.core.EventPublisherManagementService;
|
|
|
|
|
import org.wso2.carbon.event.publisher.core.EventPublisherService;
|
|
|
|
|
import org.wso2.carbon.event.publisher.core.config.EventPublisherConfiguration;
|
|
|
|
|
import org.wso2.carbon.event.publisher.core.config.mapping.JSONOutputMapping;
|
|
|
|
|
import org.wso2.carbon.event.publisher.core.config.mapping.MapOutputMapping;
|
|
|
|
|
import org.wso2.carbon.event.publisher.core.exception.EventPublisherConfigurationException;
|
|
|
|
|
import org.wso2.carbon.event.publisher.core.internal.ds.EventPublisherServiceDS;
|
|
|
|
|
import org.wso2.carbon.event.publisher.stub.EventPublisherAdminServiceCallbackHandler;
|
|
|
|
|
import org.wso2.carbon.event.publisher.stub.EventPublisherAdminServiceStub;
|
|
|
|
|
import org.wso2.carbon.event.receiver.core.EventReceiverService;
|
|
|
|
|
import org.wso2.carbon.event.receiver.core.config.EventReceiverConfiguration;
|
|
|
|
|
import org.wso2.carbon.event.receiver.core.config.InputMapping;
|
|
|
|
|
import org.wso2.carbon.event.receiver.core.config.mapping.JSONInputMapping;
|
|
|
|
|
import org.wso2.carbon.event.receiver.core.config.mapping.WSO2EventInputMapping;
|
|
|
|
|
import org.wso2.carbon.event.receiver.core.exception.EventReceiverConfigurationException;
|
|
|
|
|
import org.wso2.carbon.event.receiver.stub.EventReceiverAdminServiceCallbackHandler;
|
|
|
|
|
import org.wso2.carbon.event.receiver.stub.EventReceiverAdminServiceStub;
|
|
|
|
|
import org.wso2.carbon.event.receiver.stub.types.BasicInputAdapterPropertyDto;
|
|
|
|
|
import org.wso2.carbon.event.receiver.stub.types.EventReceiverConfigurationDto;
|
|
|
|
|
import org.wso2.carbon.event.receiver.stub.types.InputAdapterConfigurationDto;
|
|
|
|
|
import org.wso2.carbon.event.stream.core.EventStreamService;
|
|
|
|
|
import org.wso2.carbon.event.stream.core.exception.EventStreamConfigurationException;
|
|
|
|
|
import org.wso2.carbon.event.stream.stub.EventStreamAdminServiceStub;
|
|
|
|
|
import org.wso2.carbon.event.stream.stub.types.EventStreamAttributeDto;
|
|
|
|
|
import org.wso2.carbon.event.stream.stub.types.EventStreamDefinitionDto;
|
|
|
|
|
import org.wso2.carbon.identity.jwt.client.extension.exception.JWTClientException;
|
|
|
|
|
import org.wso2.carbon.user.api.UserStoreException;
|
|
|
|
|
|
|
|
|
|
import javax.ws.rs.DELETE;
|
|
|
|
|
import javax.ws.rs.GET;
|
|
|
|
|
import javax.ws.rs.Path;
|
|
|
|
|
import javax.ws.rs.PathParam;
|
|
|
|
|
import javax.validation.Valid;
|
|
|
|
|
import javax.ws.rs.*;
|
|
|
|
|
import javax.ws.rs.core.Response;
|
|
|
|
|
import java.io.IOException;
|
|
|
|
|
import java.rmi.RemoteException;
|
|
|
|
|
import java.util.ArrayList;
|
|
|
|
|
import java.util.HashMap;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.util.Map;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
@ -173,65 +215,73 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
/**
|
|
|
|
|
* Deploy Event Stream, Receiver, Publisher and Store Configuration.
|
|
|
|
|
*/
|
|
|
|
|
// @POST
|
|
|
|
|
// @Path("/{type}")
|
|
|
|
|
// @Override
|
|
|
|
|
// public Response deployDeviceTypeEventDefinition(@PathParam("type") String deviceType,
|
|
|
|
|
// @QueryParam("skipPersist") boolean skipPersist,
|
|
|
|
|
// @QueryParam("isSharedWithAllTenants") boolean isSharedWithAllTenants,
|
|
|
|
|
// @Valid DeviceTypeEvent deviceTypeEvent) {
|
|
|
|
|
// TransportType transportType = deviceTypeEvent.getTransportType();
|
|
|
|
|
// EventAttributeList eventAttributes = deviceTypeEvent.getEventAttributeList();
|
|
|
|
|
// String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
|
|
|
|
|
// try {
|
|
|
|
|
// if (eventAttributes == null || eventAttributes.getList() == null || eventAttributes.getList().size() == 0 ||
|
|
|
|
|
// deviceType == null || transportType == null ||
|
|
|
|
|
// !DeviceMgtAPIUtils.getDeviceManagementService().getAvailableDeviceTypes().contains(deviceType)) {
|
|
|
|
|
// String errorMessage = "Invalid Payload";
|
|
|
|
|
// log.error(errorMessage);
|
|
|
|
|
// return Response.status(Response.Status.BAD_REQUEST).build();
|
|
|
|
|
// }
|
|
|
|
|
// String streamName = DeviceMgtAPIUtils.getStreamDefinition(deviceType, tenantDomain);
|
|
|
|
|
// String streamNameWithVersion = streamName + ":" + Constants.DEFAULT_STREAM_VERSION;
|
|
|
|
|
// publishStreamDefinitons(streamName, Constants.DEFAULT_STREAM_VERSION, deviceType, eventAttributes);
|
|
|
|
|
// publishEventReceivers(streamNameWithVersion, transportType, tenantDomain, isSharedWithAllTenants, deviceType);
|
|
|
|
|
// if (!skipPersist) {
|
|
|
|
|
// publishEventStore(streamName, Constants.DEFAULT_STREAM_VERSION, eventAttributes);
|
|
|
|
|
// }
|
|
|
|
|
// publishWebsocketPublisherDefinition(streamNameWithVersion, deviceType);
|
|
|
|
|
// try {
|
|
|
|
|
// PrivilegedCarbonContext.startTenantFlow();
|
|
|
|
|
// PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(
|
|
|
|
|
// MultitenantConstants.SUPER_TENANT_DOMAIN_NAME, true);
|
|
|
|
|
// if (!MultitenantConstants.SUPER_TENANT_DOMAIN_NAME.equals(tenantDomain)) {
|
|
|
|
|
// publishStreamDefinitons(streamName, Constants.DEFAULT_STREAM_VERSION, deviceType, eventAttributes);
|
|
|
|
|
// publishEventReceivers(streamNameWithVersion, transportType, tenantDomain, isSharedWithAllTenants, deviceType);
|
|
|
|
|
// }
|
|
|
|
|
// } finally {
|
|
|
|
|
// PrivilegedCarbonContext.endTenantFlow();
|
|
|
|
|
// }
|
|
|
|
|
// return Response.ok().build();
|
|
|
|
|
// } catch (AxisFault e) {
|
|
|
|
|
// log.error("Failed to create event definitions for tenantDomain:" + tenantDomain, e);
|
|
|
|
|
// return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
// } catch (RemoteException e) {
|
|
|
|
|
// log.error("Failed to connect with the remote services:" + tenantDomain, e);
|
|
|
|
|
// return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
// } catch (JWTClientException e) {
|
|
|
|
|
// log.error("Failed to generate jwt token for tenantDomain:" + tenantDomain, e);
|
|
|
|
|
// return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
// } catch (UserStoreException e) {
|
|
|
|
|
// log.error("Failed to connect with the user store, tenantDomain: " + tenantDomain, e);
|
|
|
|
|
// return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
// } catch (DeviceManagementException e) {
|
|
|
|
|
// log.error("Failed to access device management service, tenantDomain: " + tenantDomain, e);
|
|
|
|
|
// return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
// } catch (EventStreamPersistenceAdminServiceEventStreamPersistenceAdminServiceExceptionException e) {
|
|
|
|
|
// log.error("Failed to create event store for, tenantDomain: " + tenantDomain + " deviceType" + deviceType,
|
|
|
|
|
// e);
|
|
|
|
|
// return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
@POST
|
|
|
|
|
@Path("/{type}")
|
|
|
|
|
@Override
|
|
|
|
|
public Response deployDeviceTypeEventDefinition(@PathParam("type") String deviceType,
|
|
|
|
|
@QueryParam("skipPersist") boolean skipPersist,
|
|
|
|
|
@QueryParam("isSharedWithAllTenants") boolean isSharedWithAllTenants,
|
|
|
|
|
@Valid List<DeviceTypeEvent> deviceTypeEvents) {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
|
|
|
|
|
try {
|
|
|
|
|
for (DeviceTypeEvent deviceTypeEvent : deviceTypeEvents) {
|
|
|
|
|
TransportType transportType = deviceTypeEvent.getTransportType();
|
|
|
|
|
EventAttributeList eventAttributes = deviceTypeEvent.getEventAttributeList();
|
|
|
|
|
String eventName = deviceTypeEvent.getEventName();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (eventAttributes == null || eventAttributes.getList() == null || eventAttributes.getList().size() == 0 ||
|
|
|
|
|
deviceType == null || transportType == null ||
|
|
|
|
|
!DeviceMgtAPIUtils.getDeviceManagementService().getAvailableDeviceTypes().contains(deviceType)) {
|
|
|
|
|
String errorMessage = "Invalid Payload";
|
|
|
|
|
log.error(errorMessage);
|
|
|
|
|
return Response.status(Response.Status.BAD_REQUEST).build();
|
|
|
|
|
}
|
|
|
|
|
String streamName = DeviceMgtAPIUtils.getStreamDefinition(deviceType, tenantDomain, eventName);
|
|
|
|
|
publishStreamDefinitons(streamName, Constants.DEFAULT_STREAM_VERSION, eventAttributes);
|
|
|
|
|
|
|
|
|
|
String receiverName = getReceiverName(deviceType, tenantDomain, transportType, eventName);
|
|
|
|
|
publishEventReceivers(streamName, Constants.DEFAULT_STREAM_VERSION, transportType, tenantDomain,
|
|
|
|
|
isSharedWithAllTenants, deviceType, deviceTypeEvent.getEventTopicStructure(), receiverName);
|
|
|
|
|
if (!skipPersist) {
|
|
|
|
|
String rdbmsPublisherName = getPublisherName(deviceType, tenantDomain, eventName) + "_rdbms_publisher";
|
|
|
|
|
publishEventStore(streamName, Constants.DEFAULT_STREAM_VERSION, rdbmsPublisherName);
|
|
|
|
|
}
|
|
|
|
|
String wsPublisherName = getPublisherName(deviceType, tenantDomain, eventName) + "_ws_publisher";
|
|
|
|
|
publishWebsocketPublisherDefinition(streamName, Constants.DEFAULT_STREAM_VERSION, wsPublisherName);
|
|
|
|
|
try {
|
|
|
|
|
PrivilegedCarbonContext.startTenantFlow();
|
|
|
|
|
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(
|
|
|
|
|
MultitenantConstants.SUPER_TENANT_DOMAIN_NAME, true);
|
|
|
|
|
if (!MultitenantConstants.SUPER_TENANT_DOMAIN_NAME.equals(tenantDomain)) {
|
|
|
|
|
publishStreamDefinitons(streamName, Constants.DEFAULT_STREAM_VERSION, eventAttributes);
|
|
|
|
|
publishEventReceivers(streamName, Constants.DEFAULT_STREAM_VERSION, transportType, tenantDomain,
|
|
|
|
|
isSharedWithAllTenants, deviceType, deviceTypeEvent.getEventTopicStructure(), receiverName);
|
|
|
|
|
}
|
|
|
|
|
} finally {
|
|
|
|
|
PrivilegedCarbonContext.endTenantFlow();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return Response.ok().build();
|
|
|
|
|
} catch (DeviceManagementException e) {
|
|
|
|
|
log.error("Failed to access device management service, tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
} catch (MalformedStreamDefinitionException e) {
|
|
|
|
|
log.error("Failed while creating stream definition, tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
} catch (EventStreamConfigurationException e) {
|
|
|
|
|
log.error("Failed while configuring stream definition, tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
} catch (EventPublisherConfigurationException e) {
|
|
|
|
|
log.error("Failed while configuring event publisher, tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
} catch (EventReceiverConfigurationException e) {
|
|
|
|
|
log.error("Failed while configuring event receiver, tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Delete device type specific artifacts from DAS.
|
|
|
|
@ -498,158 +548,172 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
// return Response.status(Response.Status.UNAUTHORIZED.getStatusCode()).build();
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private void publishEventReceivers(String streamNameWithVersion, TransportType transportType
|
|
|
|
|
, String requestedTenantDomain, boolean isSharedWithAllTenants, String deviceType)
|
|
|
|
|
throws RemoteException, UserStoreException, JWTClientException {
|
|
|
|
|
EventReceiverAdminServiceStub receiverAdminServiceStub = DeviceMgtAPIUtils.getEventReceiverAdminServiceStub();
|
|
|
|
|
private void publishEventReceivers(String streamName, String version, TransportType transportType
|
|
|
|
|
, String requestedTenantDomain, boolean isSharedWithAllTenants, String deviceType,
|
|
|
|
|
String eventTopicStructure, String receiverName) throws EventReceiverConfigurationException {
|
|
|
|
|
EventReceiverService eventReceiverService = DeviceMgtAPIUtils.getEventReceiverService();
|
|
|
|
|
try {
|
|
|
|
|
TransportType transportTypeToBeRemoved = TransportType.HTTP;
|
|
|
|
|
if (transportType == TransportType.HTTP) {
|
|
|
|
|
transportTypeToBeRemoved = TransportType.MQTT;
|
|
|
|
|
}
|
|
|
|
|
String eventRecieverNameTobeRemoved = getReceiverName(deviceType, requestedTenantDomain, transportTypeToBeRemoved);
|
|
|
|
|
EventReceiverConfigurationDto eventReceiverConfigurationDto = receiverAdminServiceStub
|
|
|
|
|
.getActiveEventReceiverConfiguration(eventRecieverNameTobeRemoved);
|
|
|
|
|
if (eventReceiverConfigurationDto != null) {
|
|
|
|
|
EventReceiverAdminServiceCallbackHandler eventReceiverAdminServiceCallbackHandler =
|
|
|
|
|
new EventReceiverAdminServiceCallbackHandler() {
|
|
|
|
|
};
|
|
|
|
|
receiverAdminServiceStub.startundeployActiveEventReceiverConfiguration(eventRecieverNameTobeRemoved
|
|
|
|
|
, eventReceiverAdminServiceCallbackHandler);
|
|
|
|
|
// TransportType transportTypeToBeRemoved = TransportType.HTTP;
|
|
|
|
|
// if (transportType == TransportType.HTTP) {
|
|
|
|
|
// transportTypeToBeRemoved = TransportType.MQTT;
|
|
|
|
|
// }
|
|
|
|
|
// String eventRecieverNameTobeRemoved = getReceiverName(deviceType, requestedTenantDomain, transportTypeToBeRemoved);
|
|
|
|
|
EventReceiverConfiguration eventReceiverConfiguration =
|
|
|
|
|
eventReceiverService.getActiveEventReceiverConfiguration(receiverName);
|
|
|
|
|
if (eventReceiverConfiguration != null) {
|
|
|
|
|
eventReceiverService.undeployActiveEventReceiverConfiguration(receiverName);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
String adapterType = OAUTH_MQTT_ADAPTER_TYPE;
|
|
|
|
|
BasicInputAdapterPropertyDto basicInputAdapterPropertyDtos[];
|
|
|
|
|
InputEventAdapterConfiguration inputEventAdapterConfiguration = new InputEventAdapterConfiguration();
|
|
|
|
|
Map<String, String> propertyMap = new HashMap<>();
|
|
|
|
|
if (transportType == TransportType.MQTT) {
|
|
|
|
|
basicInputAdapterPropertyDtos = new BasicInputAdapterPropertyDto[3];
|
|
|
|
|
inputEventAdapterConfiguration.setType(OAUTH_MQTT_ADAPTER_TYPE);
|
|
|
|
|
String topic;
|
|
|
|
|
if (isSharedWithAllTenants) {
|
|
|
|
|
topic = "+/" + deviceType + "/+/events";
|
|
|
|
|
if (!StringUtils.isEmpty(eventTopicStructure)) {
|
|
|
|
|
if (isSharedWithAllTenants) {
|
|
|
|
|
topic = eventTopicStructure.replace("${deviceId}", "+")
|
|
|
|
|
.replace("${deviceType}", deviceType)
|
|
|
|
|
.replace("${tenantDomain}", "+");
|
|
|
|
|
} else {
|
|
|
|
|
topic = eventTopicStructure.replace("${deviceId}", "+")
|
|
|
|
|
.replace("${deviceType}", deviceType)
|
|
|
|
|
.replace("${tenantDomain}", requestedTenantDomain);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
topic = requestedTenantDomain + "/" + deviceType + "/+/events";
|
|
|
|
|
if (isSharedWithAllTenants) {
|
|
|
|
|
topic = "+/" + deviceType + "/+/events";
|
|
|
|
|
} else {
|
|
|
|
|
topic = requestedTenantDomain + "/" + deviceType + "/+/events";
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
basicInputAdapterPropertyDtos[0] = getBasicInputAdapterPropertyDto("topic", topic);
|
|
|
|
|
basicInputAdapterPropertyDtos[1] = getBasicInputAdapterPropertyDto(MQTT_CONTENT_TRANSFORMER_TYPE
|
|
|
|
|
, MQTT_CONTENT_TRANSFORMER);
|
|
|
|
|
basicInputAdapterPropertyDtos[2] = getBasicInputAdapterPropertyDto(MQTT_CONTENT_VALIDATOR_TYPE
|
|
|
|
|
, MQTT_CONTENT_VALIDATOR);
|
|
|
|
|
propertyMap.put("topic", topic);
|
|
|
|
|
propertyMap.put(MQTT_CONTENT_TRANSFORMER_TYPE, MQTT_CONTENT_TRANSFORMER);
|
|
|
|
|
propertyMap.put(MQTT_CONTENT_VALIDATOR_TYPE, MQTT_CONTENT_VALIDATOR);
|
|
|
|
|
} else {
|
|
|
|
|
adapterType = THRIFT_ADAPTER_TYPE;
|
|
|
|
|
basicInputAdapterPropertyDtos = new BasicInputAdapterPropertyDto[1];
|
|
|
|
|
basicInputAdapterPropertyDtos[0] = getBasicInputAdapterPropertyDto("events.duplicated.in.cluster", "false");
|
|
|
|
|
inputEventAdapterConfiguration.setType(THRIFT_ADAPTER_TYPE);
|
|
|
|
|
propertyMap.put("events.duplicated.in.cluster", "false");
|
|
|
|
|
}
|
|
|
|
|
String eventRecieverName = getReceiverName(deviceType, requestedTenantDomain, transportType);
|
|
|
|
|
if (receiverAdminServiceStub.getActiveEventReceiverConfiguration(eventRecieverName) == null) {
|
|
|
|
|
inputEventAdapterConfiguration.setProperties(propertyMap);
|
|
|
|
|
|
|
|
|
|
if (eventReceiverService.getActiveEventReceiverConfiguration(receiverName) == null) {
|
|
|
|
|
EventReceiverConfiguration configuration = new EventReceiverConfiguration();
|
|
|
|
|
configuration.setEventReceiverName(receiverName);
|
|
|
|
|
configuration.setToStreamName(streamName);
|
|
|
|
|
configuration.setToStreamVersion(version);
|
|
|
|
|
configuration.setFromAdapterConfiguration(inputEventAdapterConfiguration);
|
|
|
|
|
if (transportType == TransportType.MQTT) {
|
|
|
|
|
receiverAdminServiceStub.deployJsonEventReceiverConfiguration(eventRecieverName, streamNameWithVersion
|
|
|
|
|
, adapterType, null, basicInputAdapterPropertyDtos, false);
|
|
|
|
|
JSONInputMapping jsonInputMapping = new JSONInputMapping();
|
|
|
|
|
jsonInputMapping.setCustomMappingEnabled(false);
|
|
|
|
|
configuration.setInputMapping(jsonInputMapping);
|
|
|
|
|
eventReceiverService.deployEventReceiverConfiguration(configuration);
|
|
|
|
|
} else {
|
|
|
|
|
receiverAdminServiceStub.deployWso2EventReceiverConfiguration(eventRecieverName, streamNameWithVersion
|
|
|
|
|
, adapterType, null, null, null, basicInputAdapterPropertyDtos, false, null);
|
|
|
|
|
WSO2EventInputMapping wso2EventInputMapping = new WSO2EventInputMapping();
|
|
|
|
|
wso2EventInputMapping.setCustomMappingEnabled(false);
|
|
|
|
|
configuration.setInputMapping(wso2EventInputMapping);
|
|
|
|
|
eventReceiverService.deployEventReceiverConfiguration(configuration);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} finally {
|
|
|
|
|
cleanup(receiverAdminServiceStub);
|
|
|
|
|
} catch (EventReceiverConfigurationException e) {
|
|
|
|
|
log.error("Error while publishing event receiver" , e);
|
|
|
|
|
throw new EventReceiverConfigurationException(e);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void publishStreamDefinitons(String streamName, String version, String deviceType
|
|
|
|
|
, EventAttributeList eventAttributes)
|
|
|
|
|
throws RemoteException, UserStoreException, JWTClientException {
|
|
|
|
|
EventStreamAdminServiceStub eventStreamAdminServiceStub = DeviceMgtAPIUtils.getEventStreamAdminServiceStub();
|
|
|
|
|
private void publishStreamDefinitons(String streamName, String version, EventAttributeList eventAttributes)
|
|
|
|
|
throws MalformedStreamDefinitionException, EventStreamConfigurationException {
|
|
|
|
|
EventStreamService eventStreamService = DeviceMgtAPIUtils.getEventStreamService();
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
EventStreamDefinitionDto eventStreamDefinitionDto = new EventStreamDefinitionDto();
|
|
|
|
|
eventStreamDefinitionDto.setName(streamName);
|
|
|
|
|
eventStreamDefinitionDto.setVersion(version);
|
|
|
|
|
EventStreamAttributeDto eventStreamAttributeDtos[] =
|
|
|
|
|
new EventStreamAttributeDto[eventAttributes.getList().size()];
|
|
|
|
|
EventStreamAttributeDto metaStreamAttributeDtos[] =
|
|
|
|
|
new EventStreamAttributeDto[1];
|
|
|
|
|
int i = 0;
|
|
|
|
|
StreamDefinition streamDefinition = new StreamDefinition(streamName, version);
|
|
|
|
|
|
|
|
|
|
List<org.wso2.carbon.databridge.commons.Attribute> payloadDataAttributes = new ArrayList<>();
|
|
|
|
|
for (Attribute attribute : eventAttributes.getList()) {
|
|
|
|
|
EventStreamAttributeDto eventStreamAttributeDto = new EventStreamAttributeDto();
|
|
|
|
|
eventStreamAttributeDto.setAttributeName(attribute.getName());
|
|
|
|
|
eventStreamAttributeDto.setAttributeType(attribute.getType().toString());
|
|
|
|
|
eventStreamAttributeDtos[i] = eventStreamAttributeDto;
|
|
|
|
|
i++;
|
|
|
|
|
payloadDataAttributes.add(new org.wso2.carbon.databridge.commons.Attribute(attribute.getName(),
|
|
|
|
|
org.wso2.carbon.databridge.commons.AttributeType.valueOf(attribute.getType().name())));
|
|
|
|
|
}
|
|
|
|
|
streamDefinition.setPayloadData(payloadDataAttributes);
|
|
|
|
|
|
|
|
|
|
List<org.wso2.carbon.databridge.commons.Attribute> metaDataAttributes = new ArrayList<>();
|
|
|
|
|
metaDataAttributes.add(new org.wso2.carbon.databridge.commons.Attribute(DEFAULT_DEVICE_ID_ATTRIBUTE,
|
|
|
|
|
org.wso2.carbon.databridge.commons.AttributeType.STRING));
|
|
|
|
|
streamDefinition.setMetaData(metaDataAttributes);
|
|
|
|
|
|
|
|
|
|
EventStreamAttributeDto eventStreamAttributeDto = new EventStreamAttributeDto();
|
|
|
|
|
eventStreamAttributeDto.setAttributeName(DEFAULT_DEVICE_ID_ATTRIBUTE);
|
|
|
|
|
eventStreamAttributeDto.setAttributeType(AttributeType.STRING.toString());
|
|
|
|
|
metaStreamAttributeDtos[0] = eventStreamAttributeDto;
|
|
|
|
|
eventStreamDefinitionDto.setPayloadData(eventStreamAttributeDtos);
|
|
|
|
|
eventStreamDefinitionDto.setMetaData(metaStreamAttributeDtos);
|
|
|
|
|
String streamId = streamName + ":" + version;
|
|
|
|
|
if (eventStreamAdminServiceStub.getStreamDefinitionDto(streamId) != null) {
|
|
|
|
|
eventStreamAdminServiceStub.editEventStreamDefinitionAsDto(eventStreamDefinitionDto, streamId);
|
|
|
|
|
if (eventStreamService.getStreamDefinition(streamDefinition.getStreamId()) != null) {
|
|
|
|
|
eventStreamService.removeEventStreamDefinition(streamName, version);
|
|
|
|
|
eventStreamService.addEventStreamDefinition(streamDefinition);
|
|
|
|
|
} else {
|
|
|
|
|
eventStreamAdminServiceStub.addEventStreamDefinitionAsDto(eventStreamDefinitionDto);
|
|
|
|
|
eventStreamService.addEventStreamDefinition(streamDefinition);
|
|
|
|
|
}
|
|
|
|
|
} finally {
|
|
|
|
|
cleanup(eventStreamAdminServiceStub);
|
|
|
|
|
|
|
|
|
|
} catch (MalformedStreamDefinitionException e) {
|
|
|
|
|
log.error("Error while initializing stream definition " , e);
|
|
|
|
|
throw new MalformedStreamDefinitionException(e);
|
|
|
|
|
} catch (EventStreamConfigurationException e) {
|
|
|
|
|
log.error("Error while configuring stream definition " , e);
|
|
|
|
|
throw new EventStreamConfigurationException(e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
|
|
|
|
|
// private void publishEventStore(String streamName, String version, EventAttributeList eventAttributes)
|
|
|
|
|
// throws RemoteException, UserStoreException, JWTClientException,
|
|
|
|
|
// EventStreamPersistenceAdminServiceEventStreamPersistenceAdminServiceExceptionException {
|
|
|
|
|
// EventStreamPersistenceAdminServiceStub eventStreamPersistenceAdminServiceStub =
|
|
|
|
|
// DeviceMgtAPIUtils.getEventStreamPersistenceAdminServiceStub();
|
|
|
|
|
// try {
|
|
|
|
|
// AnalyticsTable analyticsTable = new AnalyticsTable();
|
|
|
|
|
// analyticsTable.setRecordStoreName(DEFAULT_EVENT_STORE_NAME);
|
|
|
|
|
// analyticsTable.setStreamVersion(version);
|
|
|
|
|
// analyticsTable.setTableName(streamName);
|
|
|
|
|
// analyticsTable.setMergeSchema(false);
|
|
|
|
|
// analyticsTable.setPersist(true);
|
|
|
|
|
// AnalyticsTableRecord analyticsTableRecords[] = new AnalyticsTableRecord[eventAttributes.getList().size() + 1];
|
|
|
|
|
// int i = 0;
|
|
|
|
|
// for (Attribute attribute : eventAttributes.getList()) {
|
|
|
|
|
// AnalyticsTableRecord analyticsTableRecord = new AnalyticsTableRecord();
|
|
|
|
|
// analyticsTableRecord.setColumnName(attribute.getName());
|
|
|
|
|
// analyticsTableRecord.setColumnType(attribute.getType().toString().toUpperCase());
|
|
|
|
|
// analyticsTableRecord.setFacet(false);
|
|
|
|
|
// analyticsTableRecord.setIndexed(false);
|
|
|
|
|
// analyticsTableRecord.setPersist(true);
|
|
|
|
|
// analyticsTableRecord.setPrimaryKey(false);
|
|
|
|
|
// analyticsTableRecord.setScoreParam(false);
|
|
|
|
|
// analyticsTableRecords[i] = analyticsTableRecord;
|
|
|
|
|
// i++;
|
|
|
|
|
// }
|
|
|
|
|
// AnalyticsTableRecord analyticsTableRecord = new AnalyticsTableRecord();
|
|
|
|
|
// analyticsTableRecord.setColumnName(DEFAULT_META_DEVICE_ID_ATTRIBUTE);
|
|
|
|
|
// analyticsTableRecord.setColumnType(AttributeType.STRING.toString().toUpperCase());
|
|
|
|
|
// analyticsTableRecord.setFacet(false);
|
|
|
|
|
// analyticsTableRecord.setIndexed(true);
|
|
|
|
|
// analyticsTableRecord.setPersist(true);
|
|
|
|
|
// analyticsTableRecord.setPrimaryKey(false);
|
|
|
|
|
// analyticsTableRecord.setScoreParam(false);
|
|
|
|
|
// analyticsTableRecords[i] = analyticsTableRecord;
|
|
|
|
|
// analyticsTable.setAnalyticsTableRecords(analyticsTableRecords);
|
|
|
|
|
// eventStreamPersistenceAdminServiceStub.addAnalyticsTable(analyticsTable);
|
|
|
|
|
// } finally {
|
|
|
|
|
// cleanup(eventStreamPersistenceAdminServiceStub);
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
private void publishEventStore(String streamName, String version, String publisherName)
|
|
|
|
|
throws EventPublisherConfigurationException {
|
|
|
|
|
|
|
|
|
|
EventPublisherService eventPublisherService = DeviceMgtAPIUtils.getEventPublisherService();
|
|
|
|
|
|
|
|
|
|
private void publishWebsocketPublisherDefinition(String streamNameWithVersion, String deviceType)
|
|
|
|
|
throws RemoteException, UserStoreException, JWTClientException {
|
|
|
|
|
EventPublisherAdminServiceStub eventPublisherAdminServiceStub = DeviceMgtAPIUtils
|
|
|
|
|
.getEventPublisherAdminServiceStub();
|
|
|
|
|
try {
|
|
|
|
|
String eventPublisherName = deviceType.trim().replace(" ", "_") + "_websocket_publisher";
|
|
|
|
|
if (eventPublisherAdminServiceStub.getActiveEventPublisherConfiguration(eventPublisherName) == null) {
|
|
|
|
|
eventPublisherAdminServiceStub.deployJsonEventPublisherConfiguration(eventPublisherName
|
|
|
|
|
, streamNameWithVersion, DEFAULT_WEBSOCKET_PUBLISHER_ADAPTER_TYPE, null, null
|
|
|
|
|
, null, false);
|
|
|
|
|
if (eventPublisherService.getActiveEventPublisherConfiguration(publisherName) == null) {
|
|
|
|
|
EventPublisherConfiguration configuration = new EventPublisherConfiguration();
|
|
|
|
|
configuration.setEventPublisherName(publisherName);
|
|
|
|
|
configuration.setFromStreamName(streamName);
|
|
|
|
|
configuration.setFromStreamVersion(version);
|
|
|
|
|
MapOutputMapping mapOutputMapping = new MapOutputMapping();
|
|
|
|
|
mapOutputMapping.setCustomMappingEnabled(false);
|
|
|
|
|
configuration.setOutputMapping(mapOutputMapping);
|
|
|
|
|
OutputEventAdapterConfiguration outputEventAdapterConfiguration = new OutputEventAdapterConfiguration();
|
|
|
|
|
outputEventAdapterConfiguration.setType("rdbms");
|
|
|
|
|
Map<String, String> staticProperties = new HashMap<>();
|
|
|
|
|
staticProperties.put("datasource.name", "EVENT_DB");
|
|
|
|
|
staticProperties.put("execution.mode", "insert");
|
|
|
|
|
staticProperties.put("table.name", "table_" + publisherName.replace(".", ""));
|
|
|
|
|
outputEventAdapterConfiguration.setStaticProperties(staticProperties);
|
|
|
|
|
configuration.setProcessEnabled(true);
|
|
|
|
|
configuration.setToAdapterConfiguration(outputEventAdapterConfiguration);
|
|
|
|
|
eventPublisherService.deployEventPublisherConfiguration(configuration);
|
|
|
|
|
}
|
|
|
|
|
} finally {
|
|
|
|
|
cleanup(eventPublisherAdminServiceStub);
|
|
|
|
|
|
|
|
|
|
} catch (EventPublisherConfigurationException e) {
|
|
|
|
|
log.error("Error while publishing to rdbms store" , e);
|
|
|
|
|
throw new EventPublisherConfigurationException(e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void publishWebsocketPublisherDefinition(String streamName, String version, String publisherName)
|
|
|
|
|
throws EventPublisherConfigurationException {
|
|
|
|
|
EventPublisherService eventPublisherService = DeviceMgtAPIUtils.getEventPublisherService();
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
if (eventPublisherService.getActiveEventPublisherConfiguration(publisherName) == null) {
|
|
|
|
|
EventPublisherConfiguration configuration = new EventPublisherConfiguration();
|
|
|
|
|
configuration.setEventPublisherName(publisherName);
|
|
|
|
|
configuration.setFromStreamName(streamName);
|
|
|
|
|
configuration.setFromStreamVersion(version);
|
|
|
|
|
JSONOutputMapping jsonOutputMapping = new JSONOutputMapping();
|
|
|
|
|
jsonOutputMapping.setCustomMappingEnabled(false);
|
|
|
|
|
configuration.setOutputMapping(jsonOutputMapping);
|
|
|
|
|
OutputEventAdapterConfiguration outputEventAdapterConfiguration = new OutputEventAdapterConfiguration();
|
|
|
|
|
outputEventAdapterConfiguration.setType("websocket-local");
|
|
|
|
|
configuration.setToAdapterConfiguration(outputEventAdapterConfiguration);
|
|
|
|
|
eventPublisherService.deployEventPublisherConfiguration(configuration);
|
|
|
|
|
}
|
|
|
|
|
} catch (EventPublisherConfigurationException e) {
|
|
|
|
|
log.error("Error while publishing to websocket-local" , e);
|
|
|
|
|
throw new EventPublisherConfigurationException(e);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private BasicInputAdapterPropertyDto getBasicInputAdapterPropertyDto(String key, String value) {
|
|
|
|
@ -667,6 +731,13 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
return deviceType.replace(" ", "_").trim() + "-" + tenantDomain + "-" + transportType.toString() + "-receiver";
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private String getReceiverName(String deviceType, String tenantDomain, TransportType transportType, String eventName) {
|
|
|
|
|
return eventName + "-" + getReceiverName(deviceType, tenantDomain, transportType);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private String getPublisherName(String tenantDomain, String deviceType, String eventName) {
|
|
|
|
|
return eventName + "_" + tenantDomain.replace(".", "_") + "_" + deviceType;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void cleanup(Stub stub) {
|
|
|
|
|
if (stub != null) {
|
|
|
|
|