|
|
@ -2,6 +2,7 @@ package org.wso2.carbon.device.mgt.jaxrs.service.impl;
|
|
|
|
|
|
|
|
|
|
|
|
import org.apache.axis2.AxisFault;
|
|
|
|
import org.apache.axis2.AxisFault;
|
|
|
|
import org.apache.axis2.client.Options;
|
|
|
|
import org.apache.axis2.client.Options;
|
|
|
|
|
|
|
|
import org.apache.axis2.client.Stub;
|
|
|
|
import org.apache.axis2.transport.http.HTTPConstants;
|
|
|
|
import org.apache.axis2.transport.http.HTTPConstants;
|
|
|
|
import org.apache.commons.codec.binary.Base64;
|
|
|
|
import org.apache.commons.codec.binary.Base64;
|
|
|
|
import org.apache.commons.httpclient.Header;
|
|
|
|
import org.apache.commons.httpclient.Header;
|
|
|
@ -82,7 +83,7 @@ import java.util.UUID;
|
|
|
|
* This is used for simple analytics purpose, to create streams and receiver dynamically and a common endpoint
|
|
|
|
* This is used for simple analytics purpose, to create streams and receiver dynamically and a common endpoint
|
|
|
|
* to retrieve data.
|
|
|
|
* to retrieve data.
|
|
|
|
*/
|
|
|
|
*/
|
|
|
|
@Path("/device-types/events")
|
|
|
|
@Path("/events")
|
|
|
|
public class DeviceEventManagementServiceImpl implements DeviceEventManagementService {
|
|
|
|
public class DeviceEventManagementServiceImpl implements DeviceEventManagementService {
|
|
|
|
|
|
|
|
|
|
|
|
private static final Log log = LogFactory.getLog(DeviceEventManagementServiceImpl.class);
|
|
|
|
private static final Log log = LogFactory.getLog(DeviceEventManagementServiceImpl.class);
|
|
|
@ -138,6 +139,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public Response getDeviceTypeEventDefinition(@PathParam("type") String deviceType) {
|
|
|
|
public Response getDeviceTypeEventDefinition(@PathParam("type") String deviceType) {
|
|
|
|
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
|
|
|
|
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
|
|
|
|
|
|
|
|
EventStreamAdminServiceStub eventStreamAdminServiceStub = null;
|
|
|
|
try {
|
|
|
|
try {
|
|
|
|
if (deviceType == null ||
|
|
|
|
if (deviceType == null ||
|
|
|
|
!DeviceMgtAPIUtils.getDeviceManagementService().getAvailableDeviceTypes().contains(deviceType)) {
|
|
|
|
!DeviceMgtAPIUtils.getDeviceManagementService().getAvailableDeviceTypes().contains(deviceType)) {
|
|
|
@ -146,11 +148,10 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
return Response.status(Response.Status.BAD_REQUEST).build();
|
|
|
|
return Response.status(Response.Status.BAD_REQUEST).build();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
String streamName = getStreamDefinition(deviceType, tenantDomain);
|
|
|
|
String streamName = getStreamDefinition(deviceType, tenantDomain);
|
|
|
|
EventStreamAdminServiceStub eventStreamAdminServiceStub = getEventStreamAdminServiceStub();
|
|
|
|
eventStreamAdminServiceStub = getEventStreamAdminServiceStub();
|
|
|
|
EventStreamDefinitionDto eventStreamDefinitionDto = eventStreamAdminServiceStub.getStreamDefinitionDto(
|
|
|
|
EventStreamDefinitionDto eventStreamDefinitionDto = eventStreamAdminServiceStub.getStreamDefinitionDto(
|
|
|
|
streamName + ":" + DEFAULT_STREAM_VERSION);
|
|
|
|
streamName + ":" + DEFAULT_STREAM_VERSION);
|
|
|
|
if (eventStreamDefinitionDto == null) {
|
|
|
|
if (eventStreamDefinitionDto == null) {
|
|
|
|
eventStreamAdminServiceStub.cleanup();
|
|
|
|
|
|
|
|
return Response.status(Response.Status.NO_CONTENT).build();
|
|
|
|
return Response.status(Response.Status.NO_CONTENT).build();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
EventStreamAttributeDto[] eventStreamAttributeDtos = eventStreamDefinitionDto.getPayloadData();
|
|
|
|
EventStreamAttributeDto[] eventStreamAttributeDtos = eventStreamDefinitionDto.getPayloadData();
|
|
|
@ -161,7 +162,6 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
, AttributeType.valueOf(eventStreamAttributeDto.getAttributeType().toUpperCase())));
|
|
|
|
, AttributeType.valueOf(eventStreamAttributeDto.getAttributeType().toUpperCase())));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
eventAttributeList.setList(attributes);
|
|
|
|
eventAttributeList.setList(attributes);
|
|
|
|
eventStreamAdminServiceStub.cleanup();
|
|
|
|
|
|
|
|
return Response.ok().entity(eventAttributeList).build();
|
|
|
|
return Response.ok().entity(eventAttributeList).build();
|
|
|
|
} catch (AxisFault e) {
|
|
|
|
} catch (AxisFault e) {
|
|
|
|
log.error("failed to retrieve event definitions for tenantDomain:" + tenantDomain, e);
|
|
|
|
log.error("failed to retrieve event definitions for tenantDomain:" + tenantDomain, e);
|
|
|
@ -178,6 +178,8 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
} catch (DeviceManagementException e) {
|
|
|
|
} catch (DeviceManagementException e) {
|
|
|
|
log.error("Failed to access device management service, tenantDomain: " + tenantDomain, e);
|
|
|
|
log.error("Failed to access device management service, tenantDomain: " + tenantDomain, e);
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
|
|
|
} finally {
|
|
|
|
|
|
|
|
cleanup(eventStreamAdminServiceStub);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -250,6 +252,12 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
public Response deleteDeviceTypeEventDefinitions(@PathParam("type") String deviceType) {
|
|
|
|
public Response deleteDeviceTypeEventDefinitions(@PathParam("type") String deviceType) {
|
|
|
|
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
|
|
|
|
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
|
|
|
|
boolean superTenantMode = false;
|
|
|
|
boolean superTenantMode = false;
|
|
|
|
|
|
|
|
EventReceiverAdminServiceStub eventReceiverAdminServiceStub = null;
|
|
|
|
|
|
|
|
EventPublisherAdminServiceStub eventPublisherAdminServiceStub = null;
|
|
|
|
|
|
|
|
EventStreamAdminServiceStub eventStreamAdminServiceStub = null;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
EventReceiverAdminServiceStub tenantBasedEventReceiverAdminServiceStub = null;
|
|
|
|
|
|
|
|
EventStreamAdminServiceStub tenantBasedEventStreamAdminServiceStub = null;
|
|
|
|
try {
|
|
|
|
try {
|
|
|
|
if (deviceType == null ||
|
|
|
|
if (deviceType == null ||
|
|
|
|
!DeviceMgtAPIUtils.getDeviceManagementService().getAvailableDeviceTypes().contains(deviceType)) {
|
|
|
|
!DeviceMgtAPIUtils.getDeviceManagementService().getAvailableDeviceTypes().contains(deviceType)) {
|
|
|
@ -260,43 +268,35 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
String eventReceiverName = getReceiverName(deviceType, tenantDomain);
|
|
|
|
String eventReceiverName = getReceiverName(deviceType, tenantDomain);
|
|
|
|
String eventPublisherName = deviceType.trim().toLowerCase() + "_websocket_publisher";
|
|
|
|
String eventPublisherName = deviceType.trim().toLowerCase() + "_websocket_publisher";
|
|
|
|
String streamName = getStreamDefinition(deviceType, tenantDomain);
|
|
|
|
String streamName = getStreamDefinition(deviceType, tenantDomain);
|
|
|
|
EventStreamAdminServiceStub eventStreamAdminServiceStub = getEventStreamAdminServiceStub();
|
|
|
|
eventStreamAdminServiceStub = getEventStreamAdminServiceStub();
|
|
|
|
if (eventStreamAdminServiceStub.getStreamDefinitionDto(streamName + ":" + DEFAULT_STREAM_VERSION) == null) {
|
|
|
|
if (eventStreamAdminServiceStub.getStreamDefinitionDto(streamName + ":" + DEFAULT_STREAM_VERSION) == null) {
|
|
|
|
return Response.status(Response.Status.NO_CONTENT).build();
|
|
|
|
return Response.status(Response.Status.NO_CONTENT).build();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
eventStreamAdminServiceStub.removeEventStreamDefinition(streamName, DEFAULT_STREAM_VERSION);
|
|
|
|
EventReceiverAdminServiceCallbackHandler eventReceiverAdminServiceCallbackHandler =
|
|
|
|
EventReceiverAdminServiceCallbackHandler eventReceiverAdminServiceCallbackHandler =
|
|
|
|
new EventReceiverAdminServiceCallbackHandler() {};
|
|
|
|
new EventReceiverAdminServiceCallbackHandler() {};
|
|
|
|
EventPublisherAdminServiceCallbackHandler eventPublisherAdminServiceCallbackHandler =
|
|
|
|
EventPublisherAdminServiceCallbackHandler eventPublisherAdminServiceCallbackHandler =
|
|
|
|
new EventPublisherAdminServiceCallbackHandler() {};
|
|
|
|
new EventPublisherAdminServiceCallbackHandler() {};
|
|
|
|
|
|
|
|
|
|
|
|
EventReceiverAdminServiceStub eventReceiverAdminServiceStub = getEventReceiverAdminServiceStub();
|
|
|
|
eventReceiverAdminServiceStub = getEventReceiverAdminServiceStub();
|
|
|
|
eventReceiverAdminServiceStub.startundeployActiveEventReceiverConfiguration(eventReceiverName
|
|
|
|
eventReceiverAdminServiceStub.startundeployInactiveEventReceiverConfiguration(eventReceiverName
|
|
|
|
, eventReceiverAdminServiceCallbackHandler);
|
|
|
|
, eventReceiverAdminServiceCallbackHandler);
|
|
|
|
|
|
|
|
|
|
|
|
eventStreamAdminServiceStub.removeEventStreamDefinition(streamName, DEFAULT_STREAM_VERSION);
|
|
|
|
eventPublisherAdminServiceStub = getEventPublisherAdminServiceStub();
|
|
|
|
|
|
|
|
eventPublisherAdminServiceStub.startundeployInactiveEventPublisherConfiguration(eventPublisherName
|
|
|
|
EventPublisherAdminServiceStub eventPublisherAdminServiceStub = getEventPublisherAdminServiceStub();
|
|
|
|
|
|
|
|
eventPublisherAdminServiceStub.startundeployActiveEventPublisherConfiguration(eventPublisherName
|
|
|
|
|
|
|
|
, eventPublisherAdminServiceCallbackHandler);
|
|
|
|
, eventPublisherAdminServiceCallbackHandler);
|
|
|
|
|
|
|
|
|
|
|
|
eventStreamAdminServiceStub.cleanup();
|
|
|
|
|
|
|
|
eventPublisherAdminServiceStub.cleanup();
|
|
|
|
|
|
|
|
eventReceiverAdminServiceStub.cleanup();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
superTenantMode = true;
|
|
|
|
superTenantMode = true;
|
|
|
|
PrivilegedCarbonContext.startTenantFlow();
|
|
|
|
PrivilegedCarbonContext.startTenantFlow();
|
|
|
|
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(
|
|
|
|
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(
|
|
|
|
MultitenantConstants.SUPER_TENANT_DOMAIN_NAME, true);
|
|
|
|
MultitenantConstants.SUPER_TENANT_DOMAIN_NAME, true);
|
|
|
|
if (!MultitenantConstants.SUPER_TENANT_DOMAIN_NAME.equals(tenantDomain)) {
|
|
|
|
if (!MultitenantConstants.SUPER_TENANT_DOMAIN_NAME.equals(tenantDomain)) {
|
|
|
|
eventReceiverAdminServiceStub = getEventReceiverAdminServiceStub();
|
|
|
|
tenantBasedEventReceiverAdminServiceStub = getEventReceiverAdminServiceStub();
|
|
|
|
eventStreamAdminServiceStub = getEventStreamAdminServiceStub();
|
|
|
|
tenantBasedEventStreamAdminServiceStub = getEventStreamAdminServiceStub();
|
|
|
|
|
|
|
|
|
|
|
|
eventReceiverAdminServiceStub.startundeployActiveEventReceiverConfiguration(eventReceiverName
|
|
|
|
|
|
|
|
, eventReceiverAdminServiceCallbackHandler);
|
|
|
|
|
|
|
|
eventStreamAdminServiceStub.removeEventStreamDefinition(streamName, DEFAULT_STREAM_VERSION);
|
|
|
|
eventStreamAdminServiceStub.removeEventStreamDefinition(streamName, DEFAULT_STREAM_VERSION);
|
|
|
|
|
|
|
|
eventReceiverAdminServiceStub.startundeployInactiveEventReceiverConfiguration(eventReceiverName
|
|
|
|
|
|
|
|
, eventReceiverAdminServiceCallbackHandler);
|
|
|
|
|
|
|
|
|
|
|
|
eventReceiverAdminServiceStub.cleanup();
|
|
|
|
|
|
|
|
eventStreamAdminServiceStub.cleanup();
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return Response.ok().build();
|
|
|
|
return Response.ok().build();
|
|
|
|
} catch (AxisFault e) {
|
|
|
|
} catch (AxisFault e) {
|
|
|
@ -318,6 +318,11 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
if (superTenantMode) {
|
|
|
|
if (superTenantMode) {
|
|
|
|
PrivilegedCarbonContext.endTenantFlow();
|
|
|
|
PrivilegedCarbonContext.endTenantFlow();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
cleanup(eventStreamAdminServiceStub);
|
|
|
|
|
|
|
|
cleanup(eventPublisherAdminServiceStub);
|
|
|
|
|
|
|
|
cleanup(eventReceiverAdminServiceStub);
|
|
|
|
|
|
|
|
cleanup(eventReceiverAdminServiceStub);
|
|
|
|
|
|
|
|
cleanup(eventStreamAdminServiceStub);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -367,109 +372,120 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
, String requestedTenantDomain, String deviceType)
|
|
|
|
, String requestedTenantDomain, String deviceType)
|
|
|
|
throws RemoteException, UserStoreException, JWTClientException {
|
|
|
|
throws RemoteException, UserStoreException, JWTClientException {
|
|
|
|
EventReceiverAdminServiceStub receiverAdminServiceStub = getEventReceiverAdminServiceStub();
|
|
|
|
EventReceiverAdminServiceStub receiverAdminServiceStub = getEventReceiverAdminServiceStub();
|
|
|
|
String adapterType = "oauth-mqtt";
|
|
|
|
try {
|
|
|
|
BasicInputAdapterPropertyDto basicInputAdapterPropertyDtos[];
|
|
|
|
String adapterType = "oauth-mqtt";
|
|
|
|
if (transportType == TransportType.MQTT) {
|
|
|
|
BasicInputAdapterPropertyDto basicInputAdapterPropertyDtos[];
|
|
|
|
basicInputAdapterPropertyDtos = new BasicInputAdapterPropertyDto[4];
|
|
|
|
if (transportType == TransportType.MQTT) {
|
|
|
|
basicInputAdapterPropertyDtos[0] = getBasicInputAdapterPropertyDto("topic", requestedTenantDomain
|
|
|
|
basicInputAdapterPropertyDtos = new BasicInputAdapterPropertyDto[4];
|
|
|
|
+ "/" + deviceType + "/+/events");
|
|
|
|
basicInputAdapterPropertyDtos[0] = getBasicInputAdapterPropertyDto("topic", requestedTenantDomain
|
|
|
|
basicInputAdapterPropertyDtos[1] = getBasicInputAdapterPropertyDto("contentValidator", "iot-mqtt");
|
|
|
|
+ "/" + deviceType + "/+/events");
|
|
|
|
basicInputAdapterPropertyDtos[2] = getBasicInputAdapterPropertyDto("cleanSession", "true");
|
|
|
|
basicInputAdapterPropertyDtos[1] = getBasicInputAdapterPropertyDto("contentValidator", "iot-mqtt");
|
|
|
|
basicInputAdapterPropertyDtos[3] = getBasicInputAdapterPropertyDto("clientId", generateUUID());
|
|
|
|
basicInputAdapterPropertyDtos[2] = getBasicInputAdapterPropertyDto("cleanSession", "true");
|
|
|
|
} else {
|
|
|
|
basicInputAdapterPropertyDtos[3] = getBasicInputAdapterPropertyDto("clientId", generateUUID());
|
|
|
|
adapterType = "oauth-http";
|
|
|
|
} else {
|
|
|
|
basicInputAdapterPropertyDtos = new BasicInputAdapterPropertyDto[1];
|
|
|
|
adapterType = "oauth-http";
|
|
|
|
basicInputAdapterPropertyDtos[0] = getBasicInputAdapterPropertyDto("contentValidator", "iot-mqtt");
|
|
|
|
basicInputAdapterPropertyDtos = new BasicInputAdapterPropertyDto[1];
|
|
|
|
}
|
|
|
|
basicInputAdapterPropertyDtos[0] = getBasicInputAdapterPropertyDto("contentValidator", "iot-mqtt");
|
|
|
|
if (receiverAdminServiceStub.getActiveEventReceiverConfiguration(eventRecieverName) == null) {
|
|
|
|
}
|
|
|
|
receiverAdminServiceStub.deployJsonEventReceiverConfiguration(eventRecieverName, streamNameWithVersion
|
|
|
|
if (receiverAdminServiceStub.getActiveEventReceiverConfiguration(eventRecieverName) == null) {
|
|
|
|
, adapterType, null, basicInputAdapterPropertyDtos, false);
|
|
|
|
receiverAdminServiceStub.deployJsonEventReceiverConfiguration(eventRecieverName, streamNameWithVersion
|
|
|
|
|
|
|
|
, adapterType, null, basicInputAdapterPropertyDtos, false);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
} finally {
|
|
|
|
|
|
|
|
cleanup(receiverAdminServiceStub);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
receiverAdminServiceStub.cleanup();
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private void publishStreamDefinitons(String streamName, String version, String deviceType
|
|
|
|
private void publishStreamDefinitons(String streamName, String version, String deviceType
|
|
|
|
, EventAttributeList eventAttributes)
|
|
|
|
, EventAttributeList eventAttributes)
|
|
|
|
throws RemoteException, UserStoreException, JWTClientException {
|
|
|
|
throws RemoteException, UserStoreException, JWTClientException {
|
|
|
|
EventStreamAdminServiceStub eventStreamAdminServiceStub = getEventStreamAdminServiceStub();
|
|
|
|
EventStreamAdminServiceStub eventStreamAdminServiceStub = getEventStreamAdminServiceStub();
|
|
|
|
EventStreamDefinitionDto eventStreamDefinitionDto = new EventStreamDefinitionDto();
|
|
|
|
try {
|
|
|
|
eventStreamDefinitionDto.setName(streamName);
|
|
|
|
EventStreamDefinitionDto eventStreamDefinitionDto = new EventStreamDefinitionDto();
|
|
|
|
eventStreamDefinitionDto.setVersion(version);
|
|
|
|
eventStreamDefinitionDto.setName(streamName);
|
|
|
|
EventStreamAttributeDto eventStreamAttributeDtos[] =
|
|
|
|
eventStreamDefinitionDto.setVersion(version);
|
|
|
|
new EventStreamAttributeDto[eventAttributes.getList().size()];
|
|
|
|
EventStreamAttributeDto eventStreamAttributeDtos[] =
|
|
|
|
int i = 0;
|
|
|
|
new EventStreamAttributeDto[eventAttributes.getList().size() + 1];
|
|
|
|
for (Attribute attribute : eventAttributes.getList()) {
|
|
|
|
int i = 0;
|
|
|
|
|
|
|
|
for (Attribute attribute : eventAttributes.getList()) {
|
|
|
|
|
|
|
|
EventStreamAttributeDto eventStreamAttributeDto = new EventStreamAttributeDto();
|
|
|
|
|
|
|
|
eventStreamAttributeDto.setAttributeName(attribute.getName());
|
|
|
|
|
|
|
|
eventStreamAttributeDto.setAttributeType(attribute.getType().toString());
|
|
|
|
|
|
|
|
eventStreamAttributeDtos[i] = eventStreamAttributeDto;
|
|
|
|
|
|
|
|
i++;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
EventStreamAttributeDto eventStreamAttributeDto = new EventStreamAttributeDto();
|
|
|
|
EventStreamAttributeDto eventStreamAttributeDto = new EventStreamAttributeDto();
|
|
|
|
eventStreamAttributeDto.setAttributeName(attribute.getName());
|
|
|
|
eventStreamAttributeDto.setAttributeName("deviceId");
|
|
|
|
eventStreamAttributeDto.setAttributeType(attribute.getType().toString());
|
|
|
|
eventStreamAttributeDto.setAttributeType(AttributeType.STRING.toString());
|
|
|
|
eventStreamAttributeDtos[i] = eventStreamAttributeDto;
|
|
|
|
eventStreamAttributeDtos[i] = eventStreamAttributeDto;
|
|
|
|
i++;
|
|
|
|
eventStreamDefinitionDto.setPayloadData(eventStreamAttributeDtos);
|
|
|
|
}
|
|
|
|
String streamId = streamName + ":" + version;
|
|
|
|
EventStreamAttributeDto metaData[] = new EventStreamAttributeDto[1];
|
|
|
|
if (eventStreamAdminServiceStub.getStreamDefinitionDto(streamId) != null) {
|
|
|
|
EventStreamAttributeDto eventStreamAttributeDto = new EventStreamAttributeDto();
|
|
|
|
eventStreamAdminServiceStub.editEventStreamDefinitionAsDto(eventStreamDefinitionDto, streamId);
|
|
|
|
eventStreamAttributeDto.setAttributeName("deviceId");
|
|
|
|
} else {
|
|
|
|
eventStreamAttributeDto.setAttributeType(AttributeType.STRING.toString());
|
|
|
|
eventStreamAdminServiceStub.addEventStreamDefinitionAsDto(eventStreamDefinitionDto);
|
|
|
|
metaData[0] = eventStreamAttributeDto;
|
|
|
|
}
|
|
|
|
eventStreamDefinitionDto.setMetaData(metaData);
|
|
|
|
} finally {
|
|
|
|
eventStreamDefinitionDto.setPayloadData(eventStreamAttributeDtos);
|
|
|
|
cleanup(eventStreamAdminServiceStub);
|
|
|
|
String streamId = streamName + ":" + version;
|
|
|
|
|
|
|
|
if (eventStreamAdminServiceStub.getStreamDefinitionDto(streamId) != null) {
|
|
|
|
|
|
|
|
eventStreamAdminServiceStub.editEventStreamDefinitionAsDto(eventStreamDefinitionDto, streamId);
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
eventStreamAdminServiceStub.addEventStreamDefinitionAsDto(eventStreamDefinitionDto);
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
eventStreamAdminServiceStub.cleanup();
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private void publishEventStore(String streamName, String version, EventAttributeList eventAttributes)
|
|
|
|
private void publishEventStore(String streamName, String version, EventAttributeList eventAttributes)
|
|
|
|
throws RemoteException, UserStoreException, JWTClientException,
|
|
|
|
throws RemoteException, UserStoreException, JWTClientException,
|
|
|
|
EventStreamPersistenceAdminServiceEventStreamPersistenceAdminServiceExceptionException {
|
|
|
|
EventStreamPersistenceAdminServiceEventStreamPersistenceAdminServiceExceptionException {
|
|
|
|
EventStreamPersistenceAdminServiceStub eventStreamAdminServiceStub =
|
|
|
|
EventStreamPersistenceAdminServiceStub eventStreamPersistenceAdminServiceStub =
|
|
|
|
getEventStreamPersistenceAdminServiceStub();
|
|
|
|
getEventStreamPersistenceAdminServiceStub();
|
|
|
|
AnalyticsTable analyticsTable = new AnalyticsTable();
|
|
|
|
try {
|
|
|
|
analyticsTable.setRecordStoreName(DEFAULT_EVENT_STORE_NAME);
|
|
|
|
AnalyticsTable analyticsTable = new AnalyticsTable();
|
|
|
|
analyticsTable.setStreamVersion(version);
|
|
|
|
analyticsTable.setRecordStoreName(DEFAULT_EVENT_STORE_NAME);
|
|
|
|
analyticsTable.setTableName(streamName);
|
|
|
|
analyticsTable.setStreamVersion(version);
|
|
|
|
analyticsTable.setMergeSchema(false);
|
|
|
|
analyticsTable.setTableName(streamName);
|
|
|
|
analyticsTable.setPersist(true);
|
|
|
|
analyticsTable.setMergeSchema(false);
|
|
|
|
AnalyticsTableRecord analyticsTableRecords[] = new AnalyticsTableRecord[eventAttributes.getList().size() + 1];
|
|
|
|
analyticsTable.setPersist(true);
|
|
|
|
int i = 0;
|
|
|
|
AnalyticsTableRecord analyticsTableRecords[] = new AnalyticsTableRecord[eventAttributes.getList().size() + 1];
|
|
|
|
for (Attribute attribute : eventAttributes.getList()) {
|
|
|
|
int i = 0;
|
|
|
|
|
|
|
|
for (Attribute attribute : eventAttributes.getList()) {
|
|
|
|
|
|
|
|
AnalyticsTableRecord analyticsTableRecord = new AnalyticsTableRecord();
|
|
|
|
|
|
|
|
analyticsTableRecord.setColumnName(attribute.getName());
|
|
|
|
|
|
|
|
analyticsTableRecord.setColumnType(attribute.getType().toString().toUpperCase());
|
|
|
|
|
|
|
|
analyticsTableRecord.setFacet(false);
|
|
|
|
|
|
|
|
analyticsTableRecord.setIndexed(true);
|
|
|
|
|
|
|
|
analyticsTableRecord.setPersist(true);
|
|
|
|
|
|
|
|
analyticsTableRecord.setPrimaryKey(false);
|
|
|
|
|
|
|
|
analyticsTableRecord.setScoreParam(false);
|
|
|
|
|
|
|
|
analyticsTableRecords[i] = analyticsTableRecord;
|
|
|
|
|
|
|
|
i++;
|
|
|
|
|
|
|
|
}
|
|
|
|
AnalyticsTableRecord analyticsTableRecord = new AnalyticsTableRecord();
|
|
|
|
AnalyticsTableRecord analyticsTableRecord = new AnalyticsTableRecord();
|
|
|
|
analyticsTableRecord.setColumnName(attribute.getName());
|
|
|
|
analyticsTableRecord.setColumnName("deviceId");
|
|
|
|
analyticsTableRecord.setColumnType(attribute.getType().toString().toUpperCase());
|
|
|
|
analyticsTableRecord.setColumnType(AttributeType.STRING.toString().toUpperCase());
|
|
|
|
analyticsTableRecord.setFacet(false);
|
|
|
|
analyticsTableRecord.setFacet(false);
|
|
|
|
analyticsTableRecord.setIndexed(true);
|
|
|
|
analyticsTableRecord.setIndexed(true);
|
|
|
|
analyticsTableRecord.setPersist(true);
|
|
|
|
analyticsTableRecord.setPersist(true);
|
|
|
|
analyticsTableRecord.setPrimaryKey(false);
|
|
|
|
analyticsTableRecord.setPrimaryKey(false);
|
|
|
|
analyticsTableRecord.setScoreParam(false);
|
|
|
|
analyticsTableRecord.setScoreParam(false);
|
|
|
|
analyticsTableRecords[i] = analyticsTableRecord;
|
|
|
|
analyticsTableRecords[i] = analyticsTableRecord;
|
|
|
|
i++;
|
|
|
|
analyticsTable.setAnalyticsTableRecords(analyticsTableRecords);
|
|
|
|
|
|
|
|
eventStreamPersistenceAdminServiceStub.addAnalyticsTable(analyticsTable);
|
|
|
|
|
|
|
|
} finally {
|
|
|
|
|
|
|
|
cleanup(eventStreamPersistenceAdminServiceStub);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
AnalyticsTableRecord analyticsTableRecord = new AnalyticsTableRecord();
|
|
|
|
|
|
|
|
analyticsTableRecord.setColumnName("meta_deviceId");
|
|
|
|
|
|
|
|
analyticsTableRecord.setColumnType(AttributeType.STRING.toString().toUpperCase());
|
|
|
|
|
|
|
|
analyticsTableRecord.setFacet(false);
|
|
|
|
|
|
|
|
analyticsTableRecord.setIndexed(true);
|
|
|
|
|
|
|
|
analyticsTableRecord.setPersist(true);
|
|
|
|
|
|
|
|
analyticsTableRecord.setPrimaryKey(false);
|
|
|
|
|
|
|
|
analyticsTableRecord.setScoreParam(false);
|
|
|
|
|
|
|
|
analyticsTableRecords[i] = analyticsTableRecord;
|
|
|
|
|
|
|
|
analyticsTable.setAnalyticsTableRecords(analyticsTableRecords);
|
|
|
|
|
|
|
|
eventStreamAdminServiceStub.addAnalyticsTable(analyticsTable);
|
|
|
|
|
|
|
|
eventStreamAdminServiceStub.cleanup();
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private void publishWebsocketPublisherDefinition(String streamNameWithVersion, String deviceType)
|
|
|
|
private void publishWebsocketPublisherDefinition(String streamNameWithVersion, String deviceType)
|
|
|
|
throws RemoteException, UserStoreException, JWTClientException {
|
|
|
|
throws RemoteException, UserStoreException, JWTClientException {
|
|
|
|
EventPublisherAdminServiceStub eventPublisherAdminServiceStub = getEventPublisherAdminServiceStub();
|
|
|
|
EventPublisherAdminServiceStub eventPublisherAdminServiceStub = getEventPublisherAdminServiceStub();
|
|
|
|
String eventPublisherName = deviceType.trim().toLowerCase() + "_websocket_publisher";
|
|
|
|
try {
|
|
|
|
if (eventPublisherAdminServiceStub.getActiveEventPublisherConfiguration(eventPublisherName) == null) {
|
|
|
|
String eventPublisherName = deviceType.trim().toLowerCase() + "_websocket_publisher";
|
|
|
|
eventPublisherAdminServiceStub.deployJsonEventPublisherConfiguration(eventPublisherName
|
|
|
|
if (eventPublisherAdminServiceStub.getActiveEventPublisherConfiguration(eventPublisherName) == null) {
|
|
|
|
, streamNameWithVersion, DEFAULT_WEBSOCKET_PUBLISHER_ADAPTER_TYPE, null, null
|
|
|
|
eventPublisherAdminServiceStub.deployJsonEventPublisherConfiguration(eventPublisherName
|
|
|
|
, null, false);
|
|
|
|
, streamNameWithVersion, DEFAULT_WEBSOCKET_PUBLISHER_ADAPTER_TYPE, null, null
|
|
|
|
|
|
|
|
, null, false);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
} finally {
|
|
|
|
|
|
|
|
cleanup(eventPublisherAdminServiceStub);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
eventPublisherAdminServiceStub.cleanup();
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private EventStreamAdminServiceStub getEventStreamAdminServiceStub()
|
|
|
|
private EventStreamAdminServiceStub getEventStreamAdminServiceStub()
|
|
|
@ -695,15 +711,16 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
, int offset, int limit) throws AnalyticsException {
|
|
|
|
, int offset, int limit) throws AnalyticsException {
|
|
|
|
int tenantId = CarbonContext.getThreadLocalCarbonContext().getTenantId();
|
|
|
|
int tenantId = CarbonContext.getThreadLocalCarbonContext().getTenantId();
|
|
|
|
AnalyticsDataAPI analyticsDataAPI = getAnalyticsDataAPI();
|
|
|
|
AnalyticsDataAPI analyticsDataAPI = getAnalyticsDataAPI();
|
|
|
|
|
|
|
|
EventRecords eventRecords = new EventRecords();
|
|
|
|
int eventCount = analyticsDataAPI.searchCount(tenantId, tableName, query);
|
|
|
|
int eventCount = analyticsDataAPI.searchCount(tenantId, tableName, query);
|
|
|
|
if (eventCount == 0) {
|
|
|
|
if (eventCount == 0) {
|
|
|
|
return null;
|
|
|
|
eventRecords.setCount(0);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
List<SearchResultEntry> resultEntries = analyticsDataAPI.search(tenantId, tableName, query, offset, limit,
|
|
|
|
List<SearchResultEntry> resultEntries = analyticsDataAPI.search(tenantId, tableName, query, offset, limit,
|
|
|
|
sortByFields);
|
|
|
|
sortByFields);
|
|
|
|
List<String> recordIds = getRecordIds(resultEntries);
|
|
|
|
List<String> recordIds = getRecordIds(resultEntries);
|
|
|
|
AnalyticsDataResponse response = analyticsDataAPI.get(tenantId, tableName, 1, null, recordIds);
|
|
|
|
AnalyticsDataResponse response = analyticsDataAPI.get(tenantId, tableName, 1, null, recordIds);
|
|
|
|
EventRecords eventRecords = new EventRecords();
|
|
|
|
eventRecords.setCount(eventCount);
|
|
|
|
eventRecords.setList(AnalyticsDataAPIUtil.listRecords(analyticsDataAPI, response));
|
|
|
|
eventRecords.setList(AnalyticsDataAPIUtil.listRecords(analyticsDataAPI, response));
|
|
|
|
return eventRecords;
|
|
|
|
return eventRecords;
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -716,4 +733,14 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
return ids;
|
|
|
|
return ids;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private void cleanup(Stub stub) {
|
|
|
|
|
|
|
|
if (stub != null) {
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
|
|
|
stub.cleanup();
|
|
|
|
|
|
|
|
} catch (AxisFault axisFault) {
|
|
|
|
|
|
|
|
// do nothing
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|