|
|
|
@ -16,10 +16,8 @@ import org.wso2.carbon.analytics.stream.persistence.stub.EventStreamPersistenceA
|
|
|
|
|
import org.wso2.carbon.analytics.stream.persistence.stub.dto.AnalyticsTable;
|
|
|
|
|
import org.wso2.carbon.analytics.stream.persistence.stub.dto.AnalyticsTableRecord;
|
|
|
|
|
import org.wso2.carbon.base.MultitenantConstants;
|
|
|
|
|
import org.wso2.carbon.base.ServerConfiguration;
|
|
|
|
|
import org.wso2.carbon.context.CarbonContext;
|
|
|
|
|
import org.wso2.carbon.context.PrivilegedCarbonContext;
|
|
|
|
|
import org.wso2.carbon.core.util.Utils;
|
|
|
|
|
import org.wso2.carbon.device.mgt.common.DeviceIdentifier;
|
|
|
|
|
import org.wso2.carbon.device.mgt.common.DeviceManagementException;
|
|
|
|
|
import org.wso2.carbon.device.mgt.common.authorization.DeviceAccessAuthorizationException;
|
|
|
|
@ -34,7 +32,6 @@ import org.wso2.carbon.device.mgt.jaxrs.util.Constants;
|
|
|
|
|
import org.wso2.carbon.device.mgt.jaxrs.util.DeviceMgtAPIUtils;
|
|
|
|
|
import org.wso2.carbon.event.publisher.stub.EventPublisherAdminServiceCallbackHandler;
|
|
|
|
|
import org.wso2.carbon.event.publisher.stub.EventPublisherAdminServiceStub;
|
|
|
|
|
import org.wso2.carbon.event.publisher.stub.types.EventMappingPropertyDto;
|
|
|
|
|
import org.wso2.carbon.event.receiver.stub.EventReceiverAdminServiceCallbackHandler;
|
|
|
|
|
import org.wso2.carbon.event.receiver.stub.EventReceiverAdminServiceStub;
|
|
|
|
|
import org.wso2.carbon.event.receiver.stub.types.BasicInputAdapterPropertyDto;
|
|
|
|
@ -42,17 +39,10 @@ import org.wso2.carbon.event.receiver.stub.types.EventReceiverConfigurationDto;
|
|
|
|
|
import org.wso2.carbon.event.stream.stub.EventStreamAdminServiceStub;
|
|
|
|
|
import org.wso2.carbon.event.stream.stub.types.EventStreamAttributeDto;
|
|
|
|
|
import org.wso2.carbon.event.stream.stub.types.EventStreamDefinitionDto;
|
|
|
|
|
import org.wso2.carbon.identity.jwt.client.extension.JWTClient;
|
|
|
|
|
import org.wso2.carbon.identity.jwt.client.extension.exception.JWTClientException;
|
|
|
|
|
import org.wso2.carbon.user.api.UserStoreException;
|
|
|
|
|
import org.wso2.carbon.analytics.datasource.commons.exception.AnalyticsException;
|
|
|
|
|
|
|
|
|
|
import javax.net.ssl.KeyManagerFactory;
|
|
|
|
|
import javax.net.ssl.SSLContext;
|
|
|
|
|
import javax.net.ssl.TrustManagerFactory;
|
|
|
|
|
import java.nio.ByteBuffer;
|
|
|
|
|
import java.nio.charset.StandardCharsets;
|
|
|
|
|
import java.security.cert.CertificateException;
|
|
|
|
|
import javax.validation.Valid;
|
|
|
|
|
import javax.ws.rs.DELETE;
|
|
|
|
|
import javax.ws.rs.GET;
|
|
|
|
@ -64,10 +54,9 @@ import javax.ws.rs.core.Response;
|
|
|
|
|
import java.rmi.RemoteException;
|
|
|
|
|
import java.util.ArrayList;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.util.UUID;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* This is used for simple analytics purpose, to create streams and receiver dynamically and a common endpoint
|
|
|
|
|
* This is used for device type integration with DAS, to create streams and receiver dynamically and a common endpoint
|
|
|
|
|
* to retrieve data.
|
|
|
|
|
*/
|
|
|
|
|
@Path("/events")
|
|
|
|
@ -85,7 +74,12 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
private static final String MQTT_CONTENT_TRANSFORMER_TYPE = "contentTransformer";
|
|
|
|
|
private static final String MQTT_CONTENT_VALIDATOR_TYPE = "contentValidator";
|
|
|
|
|
private static final String MQTT_CONTENT_VALIDATOR = "default";
|
|
|
|
|
private static final String TIMESTAMP_FIELD_NAME = "_timestamp";
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Retrieves the stream definition from das for the given device type.
|
|
|
|
|
* @return dynamic event attribute list
|
|
|
|
|
*/
|
|
|
|
|
@GET
|
|
|
|
|
@Path("/{type}")
|
|
|
|
|
@Override
|
|
|
|
@ -159,7 +153,6 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
TransportType transportType = deviceTypeEvent.getTransportType();
|
|
|
|
|
EventAttributeList eventAttributes = deviceTypeEvent.getEventAttributeList();
|
|
|
|
|
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
|
|
|
|
|
boolean superTenantMode = false;
|
|
|
|
|
try {
|
|
|
|
|
if (eventAttributes == null || eventAttributes.getList() == null || eventAttributes.getList().size() == 0 ||
|
|
|
|
|
deviceType == null || transportType == null ||
|
|
|
|
@ -174,7 +167,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
publishEventReceivers(streamNameWithVersion, transportType, tenantDomain, deviceType);
|
|
|
|
|
publishEventStore(streamName, Constants.DEFAULT_STREAM_VERSION, eventAttributes);
|
|
|
|
|
publishWebsocketPublisherDefinition(streamNameWithVersion, deviceType);
|
|
|
|
|
superTenantMode = true;
|
|
|
|
|
try {
|
|
|
|
|
PrivilegedCarbonContext.startTenantFlow();
|
|
|
|
|
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(
|
|
|
|
|
MultitenantConstants.SUPER_TENANT_DOMAIN_NAME, true);
|
|
|
|
@ -182,7 +175,9 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
publishStreamDefinitons(streamName, Constants.DEFAULT_STREAM_VERSION, deviceType, eventAttributes);
|
|
|
|
|
publishEventReceivers(streamNameWithVersion, transportType, tenantDomain, deviceType);
|
|
|
|
|
}
|
|
|
|
|
DeviceMgtAPIUtils.getDynamicEventCache().remove(deviceType);
|
|
|
|
|
} finally {
|
|
|
|
|
PrivilegedCarbonContext.endTenantFlow();
|
|
|
|
|
}
|
|
|
|
|
return Response.ok().build();
|
|
|
|
|
} catch (AxisFault e) {
|
|
|
|
|
log.error("failed to create event definitions for tenantDomain:" + tenantDomain, e);
|
|
|
|
@ -203,19 +198,17 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
log.error("Failed to create event store for, tenantDomain: " + tenantDomain + " deviceType" + deviceType,
|
|
|
|
|
e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
} finally {
|
|
|
|
|
if (superTenantMode) {
|
|
|
|
|
PrivilegedCarbonContext.endTenantFlow();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Delete device type specific artifacts from DAS.
|
|
|
|
|
*/
|
|
|
|
|
@DELETE
|
|
|
|
|
@Path("/{type}")
|
|
|
|
|
@Override
|
|
|
|
|
public Response deleteDeviceTypeEventDefinitions(@PathParam("type") String deviceType) {
|
|
|
|
|
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
|
|
|
|
|
boolean superTenantMode = false;
|
|
|
|
|
EventReceiverAdminServiceStub eventReceiverAdminServiceStub = null;
|
|
|
|
|
EventPublisherAdminServiceStub eventPublisherAdminServiceStub = null;
|
|
|
|
|
EventStreamAdminServiceStub eventStreamAdminServiceStub = null;
|
|
|
|
@ -253,7 +246,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
eventPublisherAdminServiceStub.startundeployInactiveEventPublisherConfiguration(eventPublisherName
|
|
|
|
|
, eventPublisherAdminServiceCallbackHandler);
|
|
|
|
|
|
|
|
|
|
superTenantMode = true;
|
|
|
|
|
try {
|
|
|
|
|
PrivilegedCarbonContext.startTenantFlow();
|
|
|
|
|
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(
|
|
|
|
|
MultitenantConstants.SUPER_TENANT_DOMAIN_NAME, true);
|
|
|
|
@ -262,10 +255,16 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
tenantBasedEventStreamAdminServiceStub = DeviceMgtAPIUtils.getEventStreamAdminServiceStub();
|
|
|
|
|
tenantBasedEventStreamAdminServiceStub.removeEventStreamDefinition(streamName,
|
|
|
|
|
Constants.DEFAULT_STREAM_VERSION);
|
|
|
|
|
|
|
|
|
|
tenantBasedEventReceiverAdminServiceStub.startundeployInactiveEventReceiverConfiguration(
|
|
|
|
|
eventReceiverName, eventReceiverAdminServiceCallbackHandler);
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
} finally {
|
|
|
|
|
cleanup(tenantBasedEventReceiverAdminServiceStub);
|
|
|
|
|
cleanup(tenantBasedEventStreamAdminServiceStub);
|
|
|
|
|
PrivilegedCarbonContext.endTenantFlow();
|
|
|
|
|
}
|
|
|
|
|
return Response.ok().build();
|
|
|
|
|
} catch (AxisFault e) {
|
|
|
|
|
log.error("failed to delete event definitions for tenantDomain:" + tenantDomain, e);
|
|
|
|
@ -283,19 +282,17 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
log.error("Failed to access device management service, tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
} finally {
|
|
|
|
|
if (superTenantMode) {
|
|
|
|
|
PrivilegedCarbonContext.endTenantFlow();
|
|
|
|
|
}
|
|
|
|
|
cleanup(eventStreamAdminServiceStub);
|
|
|
|
|
cleanup(eventPublisherAdminServiceStub);
|
|
|
|
|
cleanup(eventReceiverAdminServiceStub);
|
|
|
|
|
cleanup(eventReceiverAdminServiceStub);
|
|
|
|
|
cleanup(eventStreamAdminServiceStub);
|
|
|
|
|
cleanup(tenantBasedEventReceiverAdminServiceStub);
|
|
|
|
|
cleanup(tenantBasedEventStreamAdminServiceStub);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns device specific data for the give period of time.
|
|
|
|
|
*/
|
|
|
|
|
@GET
|
|
|
|
|
@Path("/{type}/{deviceId}")
|
|
|
|
|
@Override
|
|
|
|
@ -324,7 +321,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
return Response.status(Response.Status.UNAUTHORIZED.getStatusCode()).build();
|
|
|
|
|
}
|
|
|
|
|
List<SortByField> sortByFields = new ArrayList<>();
|
|
|
|
|
SortByField sortByField = new SortByField("_timestamp", SortType.DESC);
|
|
|
|
|
SortByField sortByField = new SortByField(TIMESTAMP_FIELD_NAME, SortType.DESC);
|
|
|
|
|
sortByFields.add(sortByField);
|
|
|
|
|
EventRecords eventRecords = getAllEventsForDevice(sensorTableName, query, sortByFields, offset, limit);
|
|
|
|
|
return Response.status(Response.Status.OK.getStatusCode()).entity(eventRecords).build();
|
|
|
|
@ -342,6 +339,9 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns the last know data point of the device type.
|
|
|
|
|
*/
|
|
|
|
|
@GET
|
|
|
|
|
@Path("/last-known/{type}/{deviceId}")
|
|
|
|
|
@Override
|
|
|
|
@ -361,7 +361,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
return Response.status(Response.Status.UNAUTHORIZED.getStatusCode()).build();
|
|
|
|
|
}
|
|
|
|
|
List<SortByField> sortByFields = new ArrayList<>();
|
|
|
|
|
SortByField sortByField = new SortByField("_timestamp", SortType.DESC);
|
|
|
|
|
SortByField sortByField = new SortByField(TIMESTAMP_FIELD_NAME, SortType.DESC);
|
|
|
|
|
sortByFields.add(sortByField);
|
|
|
|
|
EventRecords eventRecords = getAllEventsForDevice(sensorTableName, query, sortByFields, 0, 1);
|
|
|
|
|
return Response.status(Response.Status.OK.getStatusCode()).entity(eventRecords).build();
|
|
|
|
@ -531,12 +531,6 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
return basicInputAdapterPropertyDto;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static String generateUUID() {
|
|
|
|
|
UUID uuid = UUID.randomUUID();
|
|
|
|
|
long l = ByteBuffer.wrap(uuid.toString().getBytes(StandardCharsets.UTF_8)).getLong();
|
|
|
|
|
return Long.toString(l, Character.MAX_RADIX);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private String getTableName(String streamName) {
|
|
|
|
|
return streamName.toUpperCase().replace('.', '_');
|
|
|
|
|
}
|
|
|
|
@ -545,7 +539,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
return deviceType.replace(" ", "_").trim() + "-" + tenantDomain + "-" + transportType.toString() + "-receiver";
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public static AnalyticsDataAPI getAnalyticsDataAPI() {
|
|
|
|
|
private static AnalyticsDataAPI getAnalyticsDataAPI() {
|
|
|
|
|
PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext();
|
|
|
|
|
AnalyticsDataAPI analyticsDataAPI =
|
|
|
|
|
(AnalyticsDataAPI) ctx.getOSGiService(AnalyticsDataAPI.class, null);
|
|
|
|
@ -557,7 +551,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
return analyticsDataAPI;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected static EventRecords getAllEventsForDevice(String tableName, String query, List<SortByField> sortByFields
|
|
|
|
|
private static EventRecords getAllEventsForDevice(String tableName, String query, List<SortByField> sortByFields
|
|
|
|
|
, int offset, int limit) throws AnalyticsException {
|
|
|
|
|
int tenantId = CarbonContext.getThreadLocalCarbonContext().getTenantId();
|
|
|
|
|
AnalyticsDataAPI analyticsDataAPI = getAnalyticsDataAPI();
|
|
|
|
|