Commit e030ae71 authored by shulidong's avatar shulidong

websocket以及新功能

parent 21b89d9b
......@@ -181,6 +181,22 @@
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<!-- 引入hbase -->
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-hadoop</artifactId>
<version>2.5.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-hadoop-hbase</artifactId>
<version>2.5.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
......@@ -192,10 +208,10 @@
<artifactId>fastdfs-client-java</artifactId>
<version>1.27.0.0</version>
</dependency>
<dependency>
<!--<dependency>
<groupId>com.github.tobato</groupId>
<artifactId>fastdfs-client</artifactId>
</dependency>
</dependency>-->
<dependency>
<groupId>com.aspose</groupId> <!--自定义-->
<artifactId>words</artifactId> <!--自定义-->
......
package cn.wise.sc.energy.power.plant.business;
import cn.wise.sc.energy.power.plant.business.jni.WaveAnalysis;
import cn.wise.sc.energy.power.plant.business.security.SecurityConfig;
import org.redisson.spring.cache.RedissonCacheStatisticsAutoConfiguration;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@EnableScheduling
@SpringBootApplication
@SpringBootApplication(exclude={
RedissonCacheStatisticsAutoConfiguration.class
})
@ImportAutoConfiguration(SecurityConfig.class)
public class PowerPlantApplication {
public static void main(String[] args) {
System.setProperty("jna.protected","true");
SpringApplication.run(PowerPlantApplication.class, args);
String str = "{\"Record\":{\"Data\":{\"Data\":[{\"Data\":[14.5446,19.6776,33.1913,35.8237,30.389],\"KKSCode\":\"c0a00101mka12cy616r\",\"KeyPhaseOffset\":[],\"Period\":1}],\"Direction\":0,\"GenerationFreq\":50,\"KeyPhaseTyped\":1,\"PoleNum\":2,\"RecordFlag\":1}},\"StartTime\":1597126940000,\"TimeSpan\":1280}";
System.out.println("===========调用开始================");
System.out.println("参数从信息:===");
System.out.println(str);
//String s = WaveAnalysis.INSTANCE.WaveAnalysisModel(str);
System.out.println("===========================");
//System.out.println("结果:" + s);
}
}
......@@ -2,8 +2,12 @@ package cn.wise.sc.energy.power.plant.business.bean;
import com.alibaba.fastjson.annotation.JSONField;
import lombok.Data;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @description: 计算总功率的返回对象
......@@ -11,7 +15,7 @@ import java.util.List;
* @create: 2020-05-23 12:32
**/
@Data
public class DataPower {
public class DataPower implements Serializable {
/**
* X轴数据
......@@ -25,11 +29,19 @@ public class DataPower {
@JSONField(name = "dataList")
List<PowerPoints> dataList = new ArrayList<>();
@JSONField(name = "dataMap")
Map<String, PowerPoints> dataMap=new HashMap<>();
@JSONField(name = "label")
String label;
/**
* 数据
*/
@Data
public static class PowerPoints {
public static class PowerPoints implements Serializable {
/**
* 测点名
*/
......@@ -37,7 +49,6 @@ public class DataPower {
/**
* kks code
*/
private String kksCode;
/**
* 颜色
......
package cn.wise.sc.energy.power.plant.business.config;
import org.redisson.api.RedissonClient;
import org.redisson.spring.cache.CacheConfig;
import org.redisson.spring.cache.RedissonSpringCacheManager;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableCaching
public class CacheManagerConfig {
@Resource
RedissonClient redissonClient;
@Bean
CacheManager cacheManager() {
Map<String, CacheConfig> config = new HashMap<>();
config.put("SYSTEM_PARAM", new CacheConfig(0, 15 * 60 * 1000));
return new RedissonSpringCacheManager(redissonClient, config);
}
}
\ No newline at end of file
package cn.wise.sc.energy.power.plant.business.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
/**
* 支持跨域
*/
@Configuration
public class CorsConfig implements WebMvcConfigurer {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**") // 允许跨域访问的路径
.allowedOrigins("*") // 允许跨域访问的源
.allowedMethods("POST", "GET", "PUT", "OPTIONS", "DELETE") // 允许请求方法
.maxAge(168000) // 预检间隔时间
.allowedHeaders("*") // 允许头部设置
.allowCredentials(true); // 是否发送cookie
}
}
\ No newline at end of file
......@@ -6,6 +6,7 @@ import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.redisson.spring.cache.RedissonSpringCacheManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
......@@ -13,6 +14,9 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
@ConditionalOnClass(Config.class)
@EnableConfigurationProperties(RedissonProperties.class)
......@@ -42,6 +46,4 @@ public class RedissonAutoConfiguration {
return Redisson.create(config);
}
}
\ No newline at end of file
package cn.wise.sc.energy.power.plant.business.controller;
import cn.hutool.core.collection.CollectionUtil;
import cn.wise.sc.energy.power.plant.business.domain.BtreeInfoConfig;
import cn.wise.sc.energy.power.plant.business.domain.redisTrans.BtreeInerAct;
import cn.wise.sc.energy.power.plant.business.domain.redisTrans.BtreeInerAction;
import cn.wise.sc.energy.power.plant.business.domain.redisTrans.BtreeProcess;
import cn.wise.sc.energy.power.plant.business.jna.C2TreeAnalysis;
import cn.wise.sc.energy.power.plant.business.repository.BtreeInfoConfigRepository;
import cn.wise.sc.energy.power.plant.business.task.schedule.ScheduleUtil;
import cn.wise.sc.energy.power.plant.business.utils.BeanUtilsExt;
import cn.wise.sc.energy.power.plant.common.core.bean.BaseResponse;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import io.swagger.annotations.ApiOperation;
import org.hibernate.loader.plan.build.internal.LoadGraphLoadPlanBuildingStrategy;
import org.redisson.api.RBucket;
import org.redisson.api.RDeque;
import org.redisson.api.RList;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.StringCodec;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* @author neo.shu
* @since 2020/9/14 11:15
*/
@CrossOrigin
@RestController
@RequestMapping("btreeinspec/")
public class BtreeInspectionController {
@Autowired
RedissonClient redissonClient;
@Autowired
BtreeInfoConfigRepository configRepository;
@ApiOperation("获取二叉树列表")
@PostMapping("/configList")
public BaseResponse<List<BtreeInfoConfig>> interaction() {
List<BtreeInfoConfig> list = configRepository.findAll();
return BaseResponse.okData(list);
}
@ApiOperation("更新二叉树")
@PostMapping("/configUpdate")
public BaseResponse<Boolean> interaction(@RequestBody BtreeInfoConfig info) {
BtreeInfoConfig config;
if (info.getId() == null || 0 == info.getId()) {
config = configRepository.save(info);
} else {
config = configRepository.getOne(info.getId());
BeanUtils.copyProperties(info, config, BeanUtilsExt.getNullPropertyNames(info));
config = configRepository.save(config);
}
if (config.getId() != null) {
return BaseResponse.okData(true);
} else {
return BaseResponse.okData(false);
}
}
@PostMapping("/checkProcess")
public BaseResponse getProcess() {
long time = System.currentTimeMillis();
Map<String, Object> result = new HashMap<>();
RBucket<String> currentIndex = redissonClient.getBucket("btreetask:C0A001-1");
if (currentIndex.get() == null) {
result.put("status", -3);
return BaseResponse.okData(result);
}
//查询当前任务的进度
List<BtreeProcess> btreeProcesses = conventClazz("process_" + currentIndex.get(), BtreeProcess.class);
int num = -1;
if (btreeProcesses != null && btreeProcesses.size() > 0) {
for (int i = 0; i < btreeProcesses.size(); i++) {
if (btreeProcesses.get(i).getGrade() == -1.0) {
num = -1;
} else if (btreeProcesses.get(i).getGrade() == -2.0) {
num = -2;
} else if (btreeProcesses.get(i).getGrade() > 0) {
num = 0;
}
//如果有交互,查询交互
if (btreeProcesses.get(i).getGrade() == -2.0) {
List<BtreeInerAction> interAction = conventClazz("inter_" + currentIndex.get(), BtreeInerAction.class);//currentIndex.get()
btreeProcesses.get(i).setBtreeInerAction(CollectionUtil.getLast(interAction));
}
}
}
result.put("status", num);
result.put("data", btreeProcesses);
long endTime = System.currentTimeMillis();
System.out.println(endTime-time);
return BaseResponse.okData(result);
}
private <T> List<T> conventClazz(String currentIndex, Class<T> tClass) {
List<String> redisList = (ArrayList) redissonClient.getList(currentIndex, new StringCodec()).readAll();
List<T> processes = new ArrayList<>();
for (String str : redisList) {
processes.add(JSON.parseObject(str, tClass));
}
return CollectionUtil.reverseNew(processes);
}
@PostMapping("/interaction")
public BaseResponse interaction(@RequestBody BtreeInerAct btreeInerAct) {
//存储redis
RBucket<String> currentIndex = redissonClient.getBucket("btreetask:C0A001-1");
Map<String, Object> result = new HashMap<>();
if (currentIndex.get() == null) {
result.put("status", -3);
return BaseResponse.okData(result);
}
RList<String> a = redissonClient.getList("result_" + currentIndex.get(), new StringCodec());//currentIndex.get()
a.add(0, JSON.toJSONString(btreeInerAct));
//todo 更新状态-2为-1
RList<String> redisList = redissonClient.getList("process_" + currentIndex.get(), new StringCodec());//currentIndex.get()
BtreeProcess btreeProcess = JSON.parseObject(redisList.get(0), BtreeProcess.class);
btreeProcess.setGrade(-1.0);
redisList.remove(0);
redisList.add(0, JSON.toJSONString(btreeProcess));
return BaseResponse.okData("交互成功");
}
@PostMapping("/createBtree")
public BaseResponse<String> del() {
//todo 获取全场机组的最后时间点
//todo 根据规则创建二叉树巡检任务,并存入redis
String taskId = "C0A001-1-" + System.currentTimeMillis();
//每次新建任务都保存一下
redissonClient.getBucket("btreetask:C0A001-1").set(taskId, 5L, TimeUnit.MINUTES);
//todo jna发起任务。
String taskjson = "{\"taskId\":\"" + taskId + "\",\"taskTyped\":0,\"Data\":[1599717386000,1599717386000]}";
try {
new Thread(()->{
C2TreeAnalysis.INSTANCE.MachineryUnitDiagnose(taskjson);
}).start();
} catch (RuntimeException e) {
throw e;
}
return BaseResponse.okData("创建成功");
}
}
......@@ -3,7 +3,6 @@ package cn.wise.sc.energy.power.plant.business.controller;
import cn.hutool.core.util.StrUtil;
import cn.wise.sc.energy.power.plant.business.domain.CaseAnalysisInfo;
import cn.wise.sc.energy.power.plant.business.domain.PageQuery;
import cn.wise.sc.energy.power.plant.business.domain.UserInfoQuery;
import cn.wise.sc.energy.power.plant.business.repository.CaseAnalysisInfoRepository;
import cn.wise.sc.energy.power.plant.business.service.ICaseAnalysisInfoService;
import cn.wise.sc.energy.power.plant.business.utils.BeanUtilsExt;
......@@ -15,11 +14,9 @@ import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
......@@ -42,23 +39,24 @@ public class CaseAnalysisInfoController {
@PostMapping("/page")
public BaseResponse<Page<CaseAnalysisInfo>> page(@RequestBody PageQuery page) {
public BaseResponse<Page<CaseAnalysisInfo>> page(@RequestBody PageQuery page) {
if (StrUtil.isBlank(page.getPlantId())){
if (StrUtil.isBlank(page.getPlantId())) {
return BaseResponse.errorMsg("电厂id不能为空!");
}
return caseAnalysisInfoService.page( page, page.getPlantId());
return caseAnalysisInfoService.page(page, page.getPlantId());
}
@PostMapping("/newOrUpdate")
@Transactional(propagation = Propagation.REQUIRED)
public BaseResponse<Boolean> add(@RequestBody CaseAnalysisInfo info) {
CaseAnalysisInfo caseAnalysisInfo ;
if (info.getCaseId() == null) {
caseAnalysisInfo = caseRepository.save(info);
}else{
CaseAnalysisInfo caseAnalysisInfo;
if (info.getCaseId() == null || "".equals(info.getCaseId().trim())) {
info.setCaseId(null);
caseAnalysisInfo = caseRepository.save(info);
} else {
caseAnalysisInfo = caseRepository.getOne(info.getCaseId());
BeanUtils.copyProperties(info,caseAnalysisInfo, BeanUtilsExt.getNullPropertyNames(info));
BeanUtils.copyProperties(info, caseAnalysisInfo, BeanUtilsExt.getNullPropertyNames(info));
caseAnalysisInfo.setUpdateTime(LocalDateTime.now());
caseAnalysisInfo = caseRepository.save(caseAnalysisInfo);
}
......@@ -70,8 +68,16 @@ public class CaseAnalysisInfoController {
}
@PostMapping("/del")
public BaseResponse<Boolean> del(String id){
return null;
public BaseResponse<String> del(String id) {
CaseAnalysisInfo caseAnalysisInfo = caseRepository.getOne(id);
caseAnalysisInfo.setValid((short) 1);
caseAnalysisInfo.setUpdateTime(LocalDateTime.now());
caseAnalysisInfo = caseRepository.save(caseAnalysisInfo);
if (caseAnalysisInfo != null) {
return BaseResponse.okData("删除成功");
} else {
return BaseResponse.okData("删除失败");
}
}
}
......@@ -7,6 +7,7 @@ import cn.wise.sc.energy.power.plant.business.domain.vo.EntityVo;
import cn.wise.sc.energy.power.plant.business.service.IDetectionService;
import cn.wise.sc.energy.power.plant.business.service.IPlantInfoService;
import cn.wise.sc.energy.power.plant.common.core.bean.BaseResponse;
import org.springframework.util.CollectionUtils;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
......@@ -16,9 +17,12 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.validation.constraints.NotEmpty;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
/**
* @description: 电厂信息controller
......@@ -80,24 +84,28 @@ public class PlantInfoController {
if (initPower == null) {
return BaseResponse.errorMsg("当前时间点没有数据!");
}
List<DataPower.PowerPoints> dataList = initPower.getDataList();
Map<String, DataPower.PowerPoints> result = new HashMap<>();
if (!CollectionUtils.isEmpty(dataList)) {
Map<String, List<DataPower.PowerPoints>> map = new HashMap<>();
map = initPower.getDataList().stream().collect(Collectors.groupingBy(item -> item.getKksCode().substring(0, 8)));
map.forEach((key, value) -> {
//根据deviceId获取list,根据kkscode排序,选择value非空的第一个值
result.putIfAbsent(key.substring(0, 8), value.stream().sorted(Comparator.comparing(DataPower.PowerPoints::getKksCode)).filter(item -> !CollectionUtils.isEmpty(item.getValue())).findFirst().orElse(null));
});
}
initPower.setDataList(null);
//如果是机组
if (StrUtil.isNotBlank(deviceId)) {
DataPower.PowerPoints powerPoints = initPower.getDataList().get(0);
initPower.getDataList().clear();
initPower.getDataList().add(powerPoints);
initPower.setDataMap(new HashMap<String, DataPower.PowerPoints>() {{
put(deviceId, result.get(deviceId));
}});
return BaseResponse.okData(initPower);
} else {
List<DataPower.PowerPoints> newDataList = new ArrayList<>(2);
List<DataPower.PowerPoints> dataList = initPower.getDataList();
for (DataPower.PowerPoints powerPoints : dataList) {
if (powerPoints.getName().contains("1F")){
newDataList.add(0,powerPoints);
}else {
newDataList.add(1,powerPoints);
}
}
initPower.setDataList(newDataList);
initPower.setDataMap(result);
return BaseResponse.okData(initPower);
}
return BaseResponse.okData(initPower);
}
}
......@@ -17,7 +17,6 @@ import javax.persistence.Table;
**/
@Data
@Entity
@Table(name = "btreeinfo")
public class BtreeInfo extends AbstractEntity<Long> {
@Override
......
package cn.wise.sc.energy.power.plant.business.domain;
import com.alibaba.fastjson.JSONObject;
import com.vladmihalcea.hibernate.type.json.JsonStringType;
import lombok.Data;
import org.hibernate.annotations.Type;
import org.hibernate.annotations.TypeDef;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;
import java.io.Serializable;
/**
* @description:
* @author: neo.shu
* @create: 2020-09-14 10:00
**/
@Data
@Entity
@Table(name = "btreeinfo",schema = "btreeinfo")
@TypeDef(name = "json", typeClass = JsonStringType.class)
public class BtreeInfoConfig extends AbstractEntity<Long> implements Serializable {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String btreetype;
private String btreename;
private String btreefunname;
private Float version;
@Type(type = "json")
private JSONObject info;
}
package cn.wise.sc.energy.power.plant.business.domain;
import com.fasterxml.jackson.annotation.JsonValue;
import com.sun.xml.internal.ws.api.message.Attachment;
import com.vladmihalcea.hibernate.type.json.JsonStringType;
import lombok.Data;
import lombok.NoArgsConstructor;
......@@ -14,6 +12,7 @@ import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;
import javax.validation.constraints.NotNull;
import java.time.LocalDateTime;
import java.util.List;
......@@ -56,6 +55,13 @@ public class CaseAnalysisInfo extends AbstractEntity<String>{
@Column(name="update_time")
private LocalDateTime updateTime = LocalDateTime.now();
@Column(name="valid")
private short valid=0;
@Column(name="user_name")
private String username;
@Override
public String getId() {
......
......@@ -16,6 +16,7 @@ import javax.persistence.EnumType;
import javax.persistence.Enumerated;
import javax.persistence.Id;
import javax.persistence.Table;
import java.io.Serializable;
import java.util.List;
/**
......@@ -27,7 +28,7 @@ import java.util.List;
@Data
@NoArgsConstructor
@Table(name = "characterparaminfo")
public class CharacterParamInfo extends AbstractEntity<String> {
public class CharacterParamInfo extends AbstractEntity<String> implements Serializable {
/**
* 转换成Vo
......
......@@ -25,4 +25,6 @@ public class PageQuery implements Serializable {
private String startTime;
private String endTime;
private String order;
}
package cn.wise.sc.energy.power.plant.business.domain;
import cn.hutool.core.builder.EqualsBuilder;
import cn.hutool.core.builder.HashCodeBuilder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.security.core.GrantedAuthority;
......@@ -107,4 +109,23 @@ public class UserInfo extends AbstractEntity<String> implements UserDetails {
public boolean isEnabled() {
return true;
}
@Override
public int hashCode() {
// 自定义userInfo hashCode方法,session管理时会用做key
return new HashCodeBuilder(17, 37).append(getId()).append(getUsername()).toHashCode();
}
@Override
public boolean equals(Object obj) {
// 自定义equals方法
boolean isEqual = false;
if (obj != null && UserInfo.class.isAssignableFrom(obj.getClass())) {
UserInfo userInfo = (UserInfo) obj;
isEqual = new EqualsBuilder().append(getId(), userInfo.getId())
.append(getUsername(), userInfo.getUsername()).append(getUsername(), userInfo.getUsername()).isEquals();
}
return isEqual;
}
}
\ No newline at end of file
package cn.wise.sc.energy.power.plant.business.domain.redisTrans;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import javax.ws.rs.GET;
import java.io.Serializable;
import java.util.List;
/**
* @author neo.shu
* @since 2020/9/14 15:45
*/
@Data
@Builder
@Setter
@Getter
@NoArgsConstructor
@AllArgsConstructor
public class BtreeInerAction implements Serializable {
//诊断ID
private String id;
//问题id
private String qid;
//问题描述
private String name;
//图表数据(无图标为空){“Y”:[1,2,3,4,5,6]}
private List<String> data;
//选项内容
private List<String> select;
}
package cn.wise.sc.energy.power.plant.business.domain.redisTrans;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import java.io.Serializable;
/**
* @author neo.shu
* @since 2020/9/14 13:40
*/
@Data
@Builder
@ToString
@Setter
@Getter
@NoArgsConstructor
@AllArgsConstructor
public class BtreeProcess implements Serializable {
//有正值表示诊断完成。默认-1。-2表示需要交互
private double grade;
//步数
private int step;
//诊断ID
private String id;
//监测特征所属电站的标识
private String plantid;
//监测特征所属设备的标识
private String deviceid;
//监测特征所属监测单元标识
private String unitcodeid;
//算法类型
private String typed;
//监测单元名称
private String unitname;
//二叉树诊断名称
private String btreename;
//0:正常,1:异常2:其他
private int result;
private String diagnosis;
private BtreeInerAction btreeInerAction;
}
package cn.wise.sc.energy.power.plant.business.jna;
import com.sun.jna.Library;
import com.sun.jna.Native;
import java.util.concurrent.ScheduledFuture;
/**
* @description:
* @author: neo.shu
* @create: 2020-09-18 9:40
**/
public interface C2TreeAnalysis extends Library {
//
String dllPath = "C2TreeAnalysis";
C2TreeAnalysis INSTANCE = Native.loadLibrary(dllPath, C2TreeAnalysis.class);
/*
* "{\"taskId\":任务id,\"taskTyped\":0(固定值),\"Data\":[1599717386000,1599717386000]每个机组的最后的时间戳}"
*/
String MachineryUnitDiagnose(String jstr);
}
package cn.wise.sc.energy.power.plant.business.jni;
package cn.wise.sc.energy.power.plant.business.jna;
import com.sun.jna.Library;
import com.sun.jna.Native;
......@@ -10,8 +10,8 @@ import com.sun.jna.Native;
**/
public interface WaveAnalysis extends Library {
//
String dllPath = "libGMWaveAnalysisModel.so";
WaveAnalysis INSTANCE = Native.load(dllPath, WaveAnalysis.class);
String dllPath = "GMWaveAnalysisModel";
WaveAnalysis INSTANCE = Native.loadLibrary(dllPath, WaveAnalysis.class);
String WaveAnalysisModel(String jstr);
......
......@@ -24,6 +24,7 @@ public class PutExt extends BaseBean {
private Long timestamp;
private Number value;
private Map<String, String> tags;
private String tsuid;
public PutExt putTag(String key, String value) {
if (getTags() == null) {
......
package cn.wise.sc.energy.power.plant.business.repository;
import cn.wise.sc.energy.power.plant.business.domain.CharacterParamInfo;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.data.jpa.repository.Query;
import org.springframework.stereotype.Repository;
import javax.persistence.Entity;
import javax.persistence.Table;
import java.util.List;
import java.util.Map;
/**
* @description: 测点特征仓储
......@@ -30,4 +25,7 @@ public interface CharacterParamRepository extends
@Query(value = "SELECT COUNT(deviceId),deviceId FROM CharacterParamInfo GROUP BY deviceId")
List<Object[]> countAlert();
List<CharacterParamInfo> findAllByPlantId(String plantId);
}
package cn.wise.sc.energy.power.plant.business.security;
import cn.wise.sc.energy.power.plant.business.config.MyApplicationContextAware;
import cn.wise.sc.energy.power.plant.business.domain.UserInfo;
import cn.wise.sc.energy.power.plant.common.core.bean.BaseResponse;
import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
import org.redisson.api.RBucket;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.authentication.AuthenticationManager;
import org.springframework.security.authentication.AuthenticationServiceException;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
......@@ -21,6 +25,7 @@ import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.time.OffsetDateTime;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import static cn.wise.sc.energy.power.plant.business.security.SecurityConstants.AUTHORIZATION_HEADER;
import static cn.wise.sc.energy.power.plant.business.security.SecurityConstants.TOKEN_PREFIX;
......@@ -32,6 +37,7 @@ public class JWTAuthenticationFilter extends
private final AuthenticationManager authenticationManager;
private final ObjectMapper objectMapper = new ObjectMapper();
public JWTAuthenticationFilter(
final AuthenticationManager authenticationManager) {
this.authenticationManager = authenticationManager;
......@@ -53,16 +59,20 @@ public class JWTAuthenticationFilter extends
}
}
RedissonClient redissonClient = (RedissonClient) MyApplicationContextAware.getApplicationContext().getBean("redissonSingle");
@Override
protected void successfulAuthentication(final HttpServletRequest request,
final HttpServletResponse response, final FilterChain chain,
final Authentication authResult) {
UserInfo info = (UserInfo) authResult.getPrincipal();
final String token = Jwts.builder()
.setSubject(((UserInfo) authResult.getPrincipal()).getUsername())
.setSubject(info.getUsername())
.setIssuedAt(new Date())
.setExpiration(Date.from(OffsetDateTime.now().plusDays(5).toInstant()))
.signWith(JWTKeyHolder.KEY, SignatureAlgorithm.HS512)
.signWith(SignatureAlgorithm.HS256,JWTKeyHolder.RISEN_KEY)
.compact();
RBucket<String> cacheToken = redissonClient.getBucket("token:" + info.getUsername());
cacheToken.set(token,24, TimeUnit.HOURS);
response.addHeader(AUTHORIZATION_HEADER, TOKEN_PREFIX + token);
try {
String role = ((UserInfo) authResult.getPrincipal()).getRole() + "";
......@@ -75,9 +85,4 @@ public class JWTAuthenticationFilter extends
}
// response.addCookie(new Cookie(SecurityConstants.AUTH_COOKIE, token));
}
public static void main(String[] args) {
String t = "asdadasd\tadasdasd";
System.out.println(t);
}
}
package cn.wise.sc.energy.power.plant.business.security;
import cn.wise.sc.energy.power.plant.business.config.MyApplicationContextAware;
import cn.wise.sc.energy.power.plant.common.core.bean.BaseResponse;
import com.alibaba.fastjson.JSON;
import io.jsonwebtoken.Claims;
import io.jsonwebtoken.Header;
import io.jsonwebtoken.Jwts;
import org.redisson.api.RBucket;
import org.redisson.api.RedissonClient;
import org.springframework.http.HttpStatus;
import org.springframework.security.authentication.AuthenticationManager;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import org.springframework.security.core.AuthenticationException;
......@@ -21,12 +28,13 @@ import java.io.IOException;
import java.util.Objects;
import static cn.wise.sc.energy.power.plant.business.security.SecurityConstants.AUTHORIZATION_HEADER;
import static cn.wise.sc.energy.power.plant.business.security.SecurityConstants.SOCKET_TOKEN;
import static cn.wise.sc.energy.power.plant.business.security.SecurityConstants.TOKEN_PREFIX;
public class JWTFilter extends BasicAuthenticationFilter {
private final UserDetailsService userDetailsService;
RedissonClient redissonClient = (RedissonClient) MyApplicationContextAware.getApplicationContext().getBean("redissonSingle");
public JWTFilter(final AuthenticationManager authenticationManager,
final UserDetailsService userDetailsService) {
super(authenticationManager);
......@@ -38,26 +46,46 @@ public class JWTFilter extends BasicAuthenticationFilter {
final HttpServletResponse response, final FilterChain filterChain)
throws ServletException, IOException {
final String token = this.getToken(request);
if (!StringUtils.hasText(token)) {
//设置websocket 子协议头
response.setHeader("Sec-WebSocket-Protocol",request.getHeader("Sec-WebSocket-Protocol"));
if(true){
filterChain.doFilter(request, response);
return;
}
if(request.getServletPath().contains("/login")||request.getServletPath().contains("/plantInfo/allPlantInfo")){
filterChain.doFilter(request, response);
return;
}
if (!StringUtils.hasText(token)) {
response.setStatus(HttpStatus.UNAUTHORIZED.value());
response.setContentType("text/html;charset=utf-8");
response.getWriter().write(JSON.toJSONString(BaseResponse.errorMsg("没有token权限")));
return;
}
Claims claims;
try {
claims = Jwts.parserBuilder()
.setSigningKey(JWTKeyHolder.KEY)
.setSigningKey(JWTKeyHolder.RISEN_KEY)
.build()
.parseClaimsJws(token)
.getBody();
} catch (Exception e) {
response.setStatus(401);
filterChain.doFilter(request, response);
response.setStatus(HttpStatus.UNAUTHORIZED.value());
response.setContentType("text/html;charset=utf-8");
response.getWriter().write(JSON.toJSONString(BaseResponse.errorMsg("无权限用户")));
return;
}
final UserDetails userDetails = this.userDetailsService
.loadUserByUsername(claims.getSubject());
RBucket<String> cacheToken = redissonClient.getBucket("token:" + userDetails.getUsername());
if (!token.equals(cacheToken.get())) {
response.setStatus(HttpStatus.UNAUTHORIZED.value());
response.setContentType("text/html;charset=utf-8");
response.getWriter().write(JSON.toJSONString(BaseResponse.errorMsg("toke已过期")));
return;
}
SecurityContextHolder.getContext()
.setAuthentication(new UsernamePasswordAuthenticationToken(
userDetails.getUsername(), null, userDetails.getAuthorities()
......@@ -66,17 +94,11 @@ public class JWTFilter extends BasicAuthenticationFilter {
}
private String getToken(final HttpServletRequest request) {
// if (request.getCookies() != null) {
// for (final Cookie cookie : request.getCookies()) {
// if (Objects.equals(cookie.getName(), SecurityConstants.AUTH_COOKIE)) {
// if (StringUtils.hasText(cookie.getValue())) {
// return cookie.getValue();
// }
// }
// }
// }
final String header = request.getHeader(AUTHORIZATION_HEADER);
if (header != null && header.startsWith(TOKEN_PREFIX)) {
final String web_header = request.getHeader(AUTHORIZATION_HEADER);
//socket token
final String socket_header = request.getHeader(SOCKET_TOKEN);
String header = socket_header!=null?TOKEN_PREFIX+socket_header:web_header;
if (header != null && header.startsWith(TOKEN_PREFIX)) {
return header.substring(TOKEN_PREFIX.length());
}
return null;
......
......@@ -10,4 +10,5 @@ import java.security.Key;
public class JWTKeyHolder {
public static Key KEY = MacProvider.generateKey();
public static String RISEN_KEY = "Risen12345678987654321nesiRHOWMANYBYTESDOYOUWANTIAMBOREDWITHTHIS";
}
......@@ -9,9 +9,14 @@ import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter;
import org.springframework.security.config.http.SessionCreationPolicy;
import org.springframework.security.core.session.SessionRegistry;
import org.springframework.security.core.session.SessionRegistryImpl;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.security.crypto.password.PasswordEncoder;
import org.springframework.security.web.authentication.UsernamePasswordAuthenticationFilter;
import org.springframework.security.web.session.ConcurrentSessionFilter;
import javax.annotation.Resource;
@Configuration
@EnableWebSecurity
......@@ -25,6 +30,21 @@ public class SecurityConfig extends WebSecurityConfigurerAdapter {
this.jwtUserDetailsService = jwtUserDetailsService;
}
@Resource
private SessionRegistry sessionRegistry;
@Bean
public SessionRegistry sessionRegistry() {
return new SessionRegistryImpl();
}
@Bean
public ConcurrentSessionFilter concurrencyFilter() {
// 定义session失效后,重定向的url
ConcurrentSessionFilter concurrentSessionFilter = new ConcurrentSessionFilter(sessionRegistry(), "/login");
return concurrentSessionFilter;
}
@Autowired
public void configureGlobal(AuthenticationManagerBuilder auth) throws Exception {
// configure AuthenticationManager so that it knows from where to load
......@@ -32,11 +52,12 @@ public class SecurityConfig extends WebSecurityConfigurerAdapter {
// Use BCryptPasswordEncoder
auth.userDetailsService(jwtUserDetailsService).passwordEncoder(passwordEncoder());
}
@Override
protected void configure(final HttpSecurity http) throws Exception {
http.csrf().disable()
.authorizeRequests()
.antMatchers("/login").permitAll()
.antMatchers("/login","/**/allPlantInfo").permitAll()
.antMatchers("/admin").hasRole("ADMIN")
.antMatchers(
"/swagger-ui.html",
......@@ -57,6 +78,13 @@ public class SecurityConfig extends WebSecurityConfigurerAdapter {
UsernamePasswordAuthenticationFilter.class)
.sessionManagement()
.sessionCreationPolicy(SessionCreationPolicy.STATELESS);
http.addFilterAt(concurrencyFilter(), ConcurrentSessionFilter.class);
//session管理
//session失效后跳转
http.sessionManagement().invalidSessionUrl("/login");
//只允许一个用户登录,如果同一个账户两次登录,那么第一个账户将被踢下线,跳转到登录页面
http.sessionManagement().maximumSessions(1).sessionRegistry(sessionRegistry).expiredUrl("/login");
}
// @Override
......
......@@ -3,6 +3,7 @@ package cn.wise.sc.energy.power.plant.business.security;
public interface SecurityConstants {
String AUTHORIZATION_HEADER = "Authorization";
String SOCKET_TOKEN = "Sec-WebSocket-Protocol";
String TOKEN_PREFIX = "Bearer ";
String AUTH_COOKIE = "plant-auth";
}
......@@ -3,11 +3,10 @@ package cn.wise.sc.energy.power.plant.business.service;
import cn.wise.sc.energy.power.plant.business.bean.DataPower;
import cn.wise.sc.energy.power.plant.business.bean.TimeModelQuery;
import cn.wise.sc.energy.power.plant.business.domain.CharacterParamInfo;
import cn.wise.sc.energy.power.plant.business.domain.Frequency;
import cn.wise.sc.energy.power.plant.business.domain.Oscillogram;
import cn.wise.sc.energy.power.plant.business.domain.vo.CharacterParamInfoVo;
import cn.wise.sc.energy.power.plant.business.domain.vo.EntityVo;
import cn.wise.sc.energy.power.plant.common.core.bean.BaseResponse;
import net.opentsdb.client.api.query.response.QueryResponse;
import org.springframework.web.multipart.MultipartFile;
import java.io.IOException;
......@@ -22,6 +21,8 @@ import java.util.Map;
**/
public interface ICharacterParamService extends IBaseService<String, CharacterParamInfo> {
List<CharacterParamInfo> list(String plantid);
Object count();
/**
......@@ -68,6 +69,8 @@ public interface ICharacterParamService extends IBaseService<String, CharacterPa
*/
Map<Long, Map<String, Number>> getRealTimeDataByKksCode(List<String> kksCodes, String start, String downSample);
QueryResponse getRealTimeDataByKksCode(List<String> kksCodes, String start, String end, String downSample);
/**
* 根据kksCode获取功率实时数据 4小时一统计
*
......
......@@ -2,6 +2,7 @@ package cn.wise.sc.energy.power.plant.business.service.impl;
import cn.hutool.core.bean.BeanUtil;
import cn.wise.sc.energy.power.plant.business.domain.AbstractEntity;
import cn.wise.sc.energy.power.plant.business.domain.CharacterParamInfo;
import cn.wise.sc.energy.power.plant.business.domain.vo.EntityVo;
import cn.wise.sc.energy.power.plant.business.service.IBaseService;
import cn.wise.sc.energy.power.plant.common.core.bean.EntityQuery;
......@@ -23,7 +24,7 @@ import java.util.Optional;
* @create: 2020-07-14 17:01
**/
@Slf4j
public class BaseServiceImpl<ID, T extends AbstractEntity<ID>>
public abstract class BaseServiceImpl<ID, T extends AbstractEntity<ID>>
implements IBaseService<ID, T> {
@Autowired
......@@ -123,4 +124,5 @@ public class BaseServiceImpl<ID, T extends AbstractEntity<ID>>
}
return tClass;
}
}
package cn.wise.sc.energy.power.plant.business.service.impl;
import cn.hutool.core.util.StrUtil;
import cn.wise.sc.energy.power.plant.business.domain.CaseAnalysisInfo;
import cn.wise.sc.energy.power.plant.business.domain.PageQuery;
import cn.wise.sc.energy.power.plant.business.repository.CaseAnalysisInfoRepository;
import cn.wise.sc.energy.power.plant.business.service.ICaseAnalysisInfoService;
import cn.wise.sc.energy.power.plant.common.core.bean.BaseResponse;
import io.jsonwebtoken.lang.Collections;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
......@@ -16,11 +14,7 @@ import org.springframework.data.jpa.domain.Specification;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import javax.persistence.criteria.Expression;
import javax.persistence.criteria.Path;
import javax.persistence.criteria.Predicate;
import javax.persistence.criteria.Selection;
import javax.persistence.criteria.Subquery;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
......@@ -38,6 +32,9 @@ public class CaseAnalysisInfoServiceImpl implements ICaseAnalysisInfoService {
public BaseResponse<Page<CaseAnalysisInfo>> page(PageQuery page, String plantId) {
Sort sort = Sort.by(Sort.Direction.DESC, "caseId");
if (page.getOrder() != null && !page.getOrder().trim().equals("")) {
sort = Sort.by(Sort.Direction.DESC, page.getOrder());
}
Pageable pages = PageRequest.of(page.getPageNo(), page.getPageSize(), sort);
Page<CaseAnalysisInfo> infoPage;
......@@ -47,9 +44,10 @@ public class CaseAnalysisInfoServiceImpl implements ICaseAnalysisInfoService {
if (!StringUtils.isEmpty(page.getVaguewords())) {
list.add(cb.like(root.get("title").as(String.class), "%" + page.getVaguewords() + "%"));
}
list.add(cb.equal(root.get("valid"), 0));
//电厂id必传
if (!StringUtils.isEmpty(plantId)) {
list.add(cb.equal(root.get("plantId"),plantId));
list.add(cb.equal(root.get("plantId"), plantId));
}
//匹配关键字
if (page.getKeywords().size() > 0) {
......@@ -58,6 +56,7 @@ public class CaseAnalysisInfoServiceImpl implements ICaseAnalysisInfoService {
Expression expression = query.
list.add(cb.isTrue());
}*/
}
//时间选择
......
......@@ -8,14 +8,12 @@ import cn.wise.sc.energy.power.plant.business.bean.TimeModelQuery;
import cn.wise.sc.energy.power.plant.business.domain.CharacterParamInfo;
import cn.wise.sc.energy.power.plant.business.domain.FrequencyQuery;
import cn.wise.sc.energy.power.plant.business.domain.Oscillogram;
import cn.wise.sc.energy.power.plant.business.domain.OscillogramRowMapper;
import cn.wise.sc.energy.power.plant.business.domain.OscillogramTagsMapper;
import cn.wise.sc.energy.power.plant.business.domain.RowKeyMapper;
import cn.wise.sc.energy.power.plant.business.domain.UnitInfo;
import cn.wise.sc.energy.power.plant.business.domain.eum.TendencyStatus;
import cn.wise.sc.energy.power.plant.business.domain.vo.CharacterParamInfoVo;
import cn.wise.sc.energy.power.plant.business.domain.vo.EntityVo;
import cn.wise.sc.energy.power.plant.business.jni.WaveAnalysis;
import cn.wise.sc.energy.power.plant.business.jna.WaveAnalysis;
import cn.wise.sc.energy.power.plant.business.opentsdb.OpentsdbOkHttpClient;
import cn.wise.sc.energy.power.plant.business.opentsdb.bean.QueryExt;
import cn.wise.sc.energy.power.plant.business.opentsdb.bean.QueryRequestExt;
......@@ -32,17 +30,8 @@ import lombok.extern.slf4j.Slf4j;
import net.opentsdb.client.api.query.response.QueryResponse;
import net.opentsdb.client.bean.Aggregator;
import net.opentsdb.client.bean.QueryResult;
import org.apache.commons.math3.stat.Frequency;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.beans.BeanUtils;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.data.domain.Example;
import org.springframework.data.jpa.domain.Specification;
import org.springframework.stereotype.Service;
......@@ -92,6 +81,14 @@ public class CharacterParamServiceImpl extends BaseServiceImpl<String, Character
this.hbaseTemplate = hbaseTemplate;
}
@Override
@Cacheable(value = "CHARACTER_PARAM_INFO", key = "#plantid")
public List<CharacterParamInfo> list(String plantid) {
return characterParamRepository.findAllByPlantId(plantid);
}
@Override
public List<Object[]> count() {
return characterParamRepository.countAlert();
......@@ -192,6 +189,8 @@ public class CharacterParamServiceImpl extends BaseServiceImpl<String, Character
QueryRequestExt queryRequestExt = QueryRequestExt.builder()
.start(start)
.build();
//获取最后一条数据 然后根据这个查询
for (CharacterParamInfo characterParamInfo : characterParamInfos) {
QueryExt build = QueryExt.builder()
.aggregator(Aggregator.AVG)
......@@ -223,6 +222,49 @@ public class CharacterParamServiceImpl extends BaseServiceImpl<String, Character
return rts;
}
@Override
public QueryResponse getRealTimeDataByKksCode(List<String> kksCodes, String start, String end, String downSample) {
ResponseEnum.COLLECTION_NOT_ILLEGAL.assertCollectionNotILLEGAL(kksCodes);
//去数据库匹配测点
List<CharacterParamInfo> characterParamInfos = characterParamRepository.findAll((Specification<CharacterParamInfo>) (root, query, criteriaBuilder) -> {
Path<String> kkscodePath = root.get("kksCode");
CriteriaBuilder.In<String> in = criteriaBuilder.in(kkscodePath);
for (String kkscode : kksCodes) {
in.value(kkscode);
}
return in;
});
QueryRequestExt queryRequestExt = QueryRequestExt.builder()
.start(start)
.end(end)
.build();
//获取最后一条数据 然后根据这个查询
for (CharacterParamInfo characterParamInfo : characterParamInfos) {
QueryExt build = QueryExt.builder()
.aggregator(Aggregator.AVG)
.metric(characterParamInfo.getKksCode())
.build();
if (StrUtil.isNotBlank(downSample)) {
build.setDownsample(downSample);
}
queryRequestExt.addQuery(build);
}
QueryResponse response;
try {
response = OpentsdbOkHttpClient.query(queryRequestExt);
//没有数据时
if (response.getResults().size() == 0) {
response = enterWhenItIsARind(characterParamInfos);
}
return response;
} catch (IOException | URISyntaxException e) {
log.error("根据kksCode获取当前start时候下的实时数据失败!");
response = enterWhenItIsARind(characterParamInfos);
return response;
}
}
@Override
public Map<String, Number> getRealTimeDataPower(List<String> kksCodes, String start) {
......@@ -348,8 +390,12 @@ public class CharacterParamServiceImpl extends BaseServiceImpl<String, Character
@Override
public String getFrequency(String toJSONString) {
return WaveAnalysis.INSTANCE.WaveAnalysisModel(toJSONString);
try {
String a = WaveAnalysis.INSTANCE.WaveAnalysisModel(toJSONString);
return a;
} catch (Exception e) {
return null;
}
}
@Override
......@@ -534,7 +580,7 @@ public class CharacterParamServiceImpl extends BaseServiceImpl<String, Character
timeStr.add(split[0]);
//count!=0时 为数据行 第一个为时间 后面为数据
for (int i = 1; i < split.length; i++) {
String kKSCode = kKSCodes.get(i-1);
String kKSCode = kKSCodes.get(i - 1);
data.get(kKSCode).add(Float.parseFloat(split[i]));
}
}
......@@ -543,11 +589,12 @@ public class CharacterParamServiceImpl extends BaseServiceImpl<String, Character
bufferedReader.close();
//拿到时间计算偏移量和起始时间
DateTime startParse = DateUtil.parse(timeStr.get(0), "YYYY/MM/DD HH:mm:ss");
DateTime startParse = DateUtil.parse(timeStr.get(0), "YYYY/MM/DD HH:mm:ss.SSS");
long startTime = startParse.getTime();
DateTime endParse = DateUtil.parse(timeStr.get(timeStr.size() - 1), "YYYY/MM/DD HH:mm:ss");
DateTime endParse = DateUtil.parse(timeStr.get(timeStr.size() - 1), "YYYY/MM/DD HH:mm:ss.SSS");
long endTime = endParse.getTime();
long timeSpan = endTime - startTime;
//毫秒转化成微秒
long timeSpan = (endTime - startTime) * 1000;
//封装频谱计算数据
FrequencyQuery frequencyQuery = new FrequencyQuery();
......@@ -613,7 +660,6 @@ public class CharacterParamServiceImpl extends BaseServiceImpl<String, Character
if (powerPoints.getValue().size() == 0) {
powerPoints.getValue().add("0");
}
//todo 查询阈值 color中添加阈值集合
}
return dataPower;
......
......@@ -2,6 +2,7 @@ package cn.wise.sc.energy.power.plant.business.service.impl;
import cn.hutool.core.util.StrUtil;
import cn.wise.sc.energy.power.plant.business.bean.DataPower;
import cn.wise.sc.energy.power.plant.business.domain.CharacterParamInfo;
import cn.wise.sc.energy.power.plant.business.domain.DeviceInfo;
import cn.wise.sc.energy.power.plant.business.domain.PlantInfo;
import cn.wise.sc.energy.power.plant.business.domain.vo.CharacterParamInfoVo;
......@@ -15,15 +16,22 @@ import cn.wise.sc.energy.power.plant.common.core.bean.BaseResponse;
import cn.wise.sc.energy.power.plant.common.core.exception.ResponseEnum;
import com.alibaba.fastjson.JSON;
import lombok.Data;
import net.opentsdb.client.api.query.request.QueryLastRequest;
import net.opentsdb.client.api.query.response.QueryLastResponse;
import net.opentsdb.client.api.query.response.QueryResponse;
import net.opentsdb.client.bean.Aggregator;
import net.opentsdb.client.bean.LastDataPoint;
import net.opentsdb.client.bean.LastDataPointQuery;
import net.opentsdb.client.bean.QueryResult;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* @description: 电厂信息服务层实现
......@@ -64,44 +72,52 @@ public class PlantInfoServiceImpl extends BaseServiceImpl<String, PlantInfo> imp
}
}
public void a() {
System.out.println("调用service");
}
@Override
public DataPower getInitPower(String plantId, String deviceId) {
ResponseEnum.LICENCE_NOT_FOUND.assertNotEmpty(plantId);
if (StrUtil.isBlank(deviceId)){
if (StrUtil.isBlank(deviceId)) {
deviceId = "";
}
List<String> cpName = new ArrayList<>(1);
cpName.add("燃机发电机有功功率");
cpName.add("有功功率");
List<CharacterParamInfoVo> characterParamInfoVos = iCharacterParamService
.getCharacterByName(cpName, plantId, deviceId);
QueryRequestExt queryRequestExt = QueryRequestExt.builder()
.start("1597456800000")
.end("1597460400000")
.build();
for (CharacterParamInfoVo characterParamInfo : characterParamInfoVos) {
QueryExt build = QueryExt.builder()
.aggregator(Aggregator.AVG)
.downsample("1m-max-zero")
.metric(characterParamInfo.getKksCode())
.build();
queryRequestExt.addQuery(build);
}
QueryResponse response;
try {
//获取最后一个时间点
Long lastTime = OpentsdbOkHttpClient.queryLast(characterParamInfoVos);
//判断并获取最后一个时间点 7天前,每5min一个点
QueryRequestExt queryRequestExt = QueryRequestExt.builder()
.start((lastTime - 7 * 24 * 3600 * 1000) + "")
.end(lastTime + "")
.build();
for (CharacterParamInfoVo characterParamInfo : characterParamInfoVos) {
QueryExt build = QueryExt.builder()
.aggregator(Aggregator.AVG)
.downsample("5m-max-zero")
.metric(characterParamInfo.getKksCode())
.build();
queryRequestExt.addQuery(build);
}
QueryResponse response;
response = OpentsdbOkHttpClient.query(queryRequestExt);
if (response.getResults().size() == 0){
if (response.getResults().size() == 0) {
return null;
}
} catch (IOException | URISyntaxException e) {
return null;
return buildDataPower(characterParamInfoVos, response);
} catch (IOException e) {
e.printStackTrace();
} catch (URISyntaxException e) {
e.printStackTrace();
}
return buildDataPower(characterParamInfoVos, response);
return null;
}
private DataPower buildDataPower(List<CharacterParamInfoVo> characterParamInfos,
......@@ -124,6 +140,7 @@ public class PlantInfoServiceImpl extends BaseServiceImpl<String, PlantInfo> imp
}
String cpName = first.get().getCpName();
DataPower.PowerPoints powerPoints = new DataPower.PowerPoints();
powerPoints.setKksCode(queryResult.getMetric());
powerPoints.setName(cpName);
queryResult.getDps().keySet().forEach(arg -> {
Number number = queryResult.getDps().get(arg);
......
......@@ -98,6 +98,12 @@ public class UserInfoServiceImpl implements IUserInfoService {
}
}
public static void main(String[] args) {
PasswordEncoder passwordEncoder = new BCryptPasswordEncoder();
String a = passwordEncoder.encode("123123");
System.out.println(a);
}
@Override
public BaseResponse<Boolean> edit(UserInfoQuery userInfo) {
......
package cn.wise.sc.energy.power.plant.business.task;
import cn.wise.sc.energy.power.plant.business.bean.DataPower;
import cn.wise.sc.energy.power.plant.business.domain.vo.CharacterParamInfoVo;
import cn.wise.sc.energy.power.plant.business.opentsdb.OpentsdbOkHttpClient;
import cn.wise.sc.energy.power.plant.business.service.ICharacterParamService;
import cn.wise.sc.energy.power.plant.business.service.IEventInfoService;
import cn.wise.sc.energy.power.plant.business.task.schedule.ScheduleTask;
import com.alibaba.fastjson.JSON;
import net.opentsdb.client.api.query.response.QueryResponse;
import net.opentsdb.client.bean.QueryResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* @author neo.shu
* @since 2020/9/17 17:30
*/
@Service(value = "hydrogeSystem1Task")
public class HydrogeSystem1Task extends ScheduleTask {
@Autowired
ICharacterParamService iCharacterParamService;
@Autowired
IEventInfoService iEventInfoService;
@Autowired
TaskCacheDataService taskCacheDataService;
public HydrogeSystem1Task() {
super(UUID.randomUUID().toString());
}
@Override
public void run() {
ConcurrentHashMap<String, HydrogenSystemWebSocket> webSocketMap =
HydrogenSystemWebSocket.webSocketMap;
ConcurrentHashMap<String, String> deviceMap = HydrogenSystemWebSocket.deviceMap;
if (webSocketMap.size() == 0 || deviceMap.size() == 0) {
return;
}
//给每个deviceid分组,然后统一查询,进行推送
Map<String, List<Map.Entry<String, String>>> groupMap = deviceMap.entrySet().stream().collect(Collectors.groupingBy(c -> c.getValue()));
//向每个webSocket推送系统实时数据
for (String deviceId : groupMap.keySet()) {
List<CharacterParamInfoVo> characterParamInfoVos = taskCacheDataService.cacheHydrogeCha(deviceId);
Map<String, String> KKsCodes = characterParamInfoVos.stream().filter(item -> IS_CONTAINS(item.getCpName(), new String[]{"汽端发电机冷氢温度"})).collect(Collectors.toMap(CharacterParamInfoVo::getKksCode, CharacterParamInfoVo::getCpName));
Long lastTime = OpentsdbOkHttpClient.queryLast(characterParamInfoVos);
QueryResponse response = iCharacterParamService
.getRealTimeDataByKksCode(new ArrayList<>(KKsCodes.keySet()), (lastTime - 1) + "", lastTime + "", "");
//组装数据结构
List<Object> xAxis = response.getResults().get(0).getDps().keySet().stream().map(item -> item.toString()).collect(Collectors.toList());
Map<String, DataPower> map = new HashMap<>();
for (QueryResult queryResult : response.getResults()) {
DataPower.PowerPoints powerPoints = new DataPower.PowerPoints();
powerPoints.setKksCode(queryResult.getMetric());
powerPoints.setName(KKsCodes.get(queryResult.getMetric()));
List<String> finalList = new ArrayList<>();
queryResult.getDps().values().stream().forEach(item -> {
finalList.add(item.toString());
});
powerPoints.setValue(finalList);
if (KKsCodes.get(queryResult.getMetric()).contains("汽端发电机冷氢温度")) {
DataPower dataPower = map.getOrDefault("rightBottom", new DataPower());
dataPower.setXAxis(xAxis);
dataPower.getDataList().add(powerPoints);
map.putIfAbsent("rightBottom", dataPower);
}
//左上图
}
try {
List<Map.Entry<String, String>> deviceGroupMap = groupMap.get(deviceId);
//根据deviceid统一发送
for (Map.Entry<String, String> entry : deviceGroupMap) {
final HydrogenSystemWebSocket webSocket = webSocketMap.get(entry.getKey());
webSocket.sendMessage(JSON.toJSONString(map));
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
package cn.wise.sc.energy.power.plant.business.task;
import cn.wise.sc.energy.power.plant.business.bean.DataPower;
import cn.wise.sc.energy.power.plant.business.domain.vo.CharacterParamInfoVo;
import cn.wise.sc.energy.power.plant.business.opentsdb.OpentsdbOkHttpClient;
import cn.wise.sc.energy.power.plant.business.service.ICharacterParamService;
import cn.wise.sc.energy.power.plant.business.service.IEventInfoService;
import cn.wise.sc.energy.power.plant.business.task.schedule.ScheduleTask;
import com.alibaba.fastjson.JSON;
import net.opentsdb.client.api.query.response.QueryResponse;
import net.opentsdb.client.bean.QueryResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* @author neo.shu
* @since 2020/9/17 17:30
*/
@Service(value = "hydrogeSystem3Task")
public class HydrogeSystem3Task extends ScheduleTask {
@Autowired
ICharacterParamService iCharacterParamService;
@Autowired
IEventInfoService iEventInfoService;
@Autowired
TaskCacheDataService taskCacheDataService;
public HydrogeSystem3Task() {
super(UUID.randomUUID().toString());
}
@Override
public void run() {
ConcurrentHashMap<String, HydrogenSystemWebSocket> webSocketMap =
HydrogenSystemWebSocket.webSocketMap;
ConcurrentHashMap<String, String> deviceMap = HydrogenSystemWebSocket.deviceMap;
if (webSocketMap.size() == 0 || deviceMap.size() == 0) {
return;
}
//给每个deviceid分组,然后统一查询,进行推送
Map<String, List<Map.Entry<String, String>>> groupMap = deviceMap.entrySet().stream().collect(Collectors.groupingBy(c -> c.getValue()));
String[] measure_points = new String[]{"发电机机内氢压", "机内氢气湿度", "漏氢含量"};
//向每个webSocket推送系统实时数据
for (String deviceId : groupMap.keySet()) {
List<CharacterParamInfoVo> characterParamInfoVos = taskCacheDataService.cacheHydrogeCha(deviceId);
//获取kkscode
Map<String, String> KKsCodes = characterParamInfoVos.stream().filter(item ->
IS_CONTAINS(item.getCpName(), measure_points))
.collect(Collectors.toMap(CharacterParamInfoVo::getKksCode, CharacterParamInfoVo::getCpName));
//获取最后一个数据点的最近一个时间
Long lastTime = OpentsdbOkHttpClient.queryLast(characterParamInfoVos);
QueryResponse response = iCharacterParamService
.getRealTimeDataByKksCode(new ArrayList<>(KKsCodes.keySet()), (lastTime - 24 * 3600 * 1000) + "", lastTime + "", "1h-avg-zero");
//获取横坐标
List<Object> xAxis = response.getResults().get(0).getDps().keySet().stream().map(item -> item.toString()).collect(Collectors.toList());
//组装数据
Map<String, DataPower> map = new HashMap<>();
for (QueryResult queryResult : response.getResults()) {
DataPower.PowerPoints powerPoints = new DataPower.PowerPoints();
powerPoints.setKksCode(queryResult.getMetric());
powerPoints.setName(KKsCodes.get(queryResult.getMetric()));
List<String> finalList = new ArrayList<>();
queryResult.getDps().values().stream().forEach(item -> {
finalList.add(item.toString());
});
powerPoints.setValue(finalList);
if (KKsCodes.get(queryResult.getMetric()).contains("发电机机内氢压")) {
//左上图
DataPower dataPower = map.getOrDefault("leftTop",new DataPower());
dataPower.setXAxis(xAxis);
dataPower.getDataList().add(powerPoints);
map.putIfAbsent("leftTop", dataPower);
}
if (KKsCodes.get(queryResult.getMetric()).contains("机内氢气湿度")) {
DataPower dataPower = map.getOrDefault("leftBottom",new DataPower());
dataPower.setXAxis(xAxis);
dataPower.getDataList().add(powerPoints);
map.putIfAbsent("leftBottom", dataPower);
}
if (KKsCodes.get(queryResult.getMetric()).contains("漏氢含量")) {
DataPower dataPower = map.getOrDefault("rightTop",new DataPower());
dataPower.setXAxis(xAxis);
dataPower.getDataList().add(powerPoints);
map.putIfAbsent("rightTop", dataPower);
}
}
try {
List<Map.Entry<String, String>> deviceGroupMap = groupMap.get(deviceId);
//根据deviceid统一发送
for (Map.Entry<String, String> entry : deviceGroupMap) {
final HydrogenSystemWebSocket webSocket = webSocketMap.get(entry.getKey());
webSocket.sendMessage(JSON.toJSONString(map));
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
package cn.wise.sc.energy.power.plant.business.task;
import cn.wise.sc.energy.power.plant.business.config.MyApplicationContextAware;
import cn.wise.sc.energy.power.plant.business.task.schedule.ScheduleUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnError;
......@@ -22,7 +25,8 @@ import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Component
@ServerEndpoint("/hydrogenSystem/{deviceId}")
public class HydrogenSystemWebSocketServer {
@DependsOn("myApplicationContextAware")
public class HydrogenSystemWebSocket {
/**
* 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
......@@ -31,7 +35,7 @@ public class HydrogenSystemWebSocketServer {
/**
* concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
*/
public static ConcurrentHashMap<String, HydrogenSystemWebSocketServer> webSocketMap = new ConcurrentHashMap<>();
public static ConcurrentHashMap<String, HydrogenSystemWebSocket> webSocketMap = new ConcurrentHashMap<>();
public static ConcurrentHashMap<String, String> deviceMap = new ConcurrentHashMap<>();
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
......@@ -39,6 +43,9 @@ public class HydrogenSystemWebSocketServer {
private Session session;
private String uuid;
private HydrogeSystem1Task hydrogeSystem1Task = (HydrogeSystem1Task) MyApplicationContextAware.getApplicationContext().getBean("hydrogeSystem1Task");
private HydrogeSystem3Task hydrogeSystem3Task = (HydrogeSystem3Task) MyApplicationContextAware.getApplicationContext().getBean("hydrogeSystem3Task");
/**
* 连接建立成功调用的方法
*/
......@@ -50,6 +57,14 @@ public class HydrogenSystemWebSocketServer {
webSocketMap.put(uuid, this);
//加入set中
addOnlineCount();
//启动任务
ScheduleUtil.start(hydrogeSystem1Task,1000);
ScheduleUtil.start(hydrogeSystem3Task,1000*3600);
//已有用户的情况,主动推送一次
if (webSocketMap.size() > 1) {
hydrogeSystem1Task.run();
hydrogeSystem3Task.run();
}
log.info("用户连接:" + deviceId + ",当前在线人数为:" + getOnlineCount());
}
......@@ -60,9 +75,14 @@ public class HydrogenSystemWebSocketServer {
public void onClose() {
if (webSocketMap.containsKey(uuid)) {
webSocketMap.remove(uuid);
deviceMap.remove(uuid);
//从set中删除
subOnlineCount();
}
if (webSocketMap.size() == 0) {
ScheduleUtil.cancel(hydrogeSystem1Task);
ScheduleUtil.cancel(hydrogeSystem3Task);
}
log.info("用户退出:" + uuid + ",当前在线人数为:" + getOnlineCount());
}
......@@ -120,10 +140,10 @@ public class HydrogenSystemWebSocketServer {
}
public static synchronized void addOnlineCount() {
HydrogenSystemWebSocketServer.onlineCount++;
HydrogenSystemWebSocket.onlineCount++;
}
public static synchronized void subOnlineCount() {
HydrogenSystemWebSocketServer.onlineCount--;
HydrogenSystemWebSocket.onlineCount--;
}
}
package cn.wise.sc.energy.power.plant.business.task;
import cn.wise.sc.energy.power.plant.business.bean.DataPower;
import cn.wise.sc.energy.power.plant.business.domain.vo.CharacterParamInfoVo;
import cn.wise.sc.energy.power.plant.business.opentsdb.OpentsdbOkHttpClient;
import cn.wise.sc.energy.power.plant.business.service.ICharacterParamService;
import cn.wise.sc.energy.power.plant.business.service.IEventInfoService;
import cn.wise.sc.energy.power.plant.business.task.schedule.ScheduleTask;
import com.alibaba.fastjson.JSON;
import net.opentsdb.client.api.query.response.QueryResponse;
import net.opentsdb.client.bean.QueryResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* @author neo.shu
* @since 2020/9/17 17:30
*/
@Service(value = "indexRealTimeTask")
public class IndexRealTimeTask extends ScheduleTask {
@Autowired
ICharacterParamService iCharacterParamService;
@Autowired
IEventInfoService iEventInfoService;
@Autowired
TaskCacheDataService taskCacheDataService;
public IndexRealTimeTask() {
super(UUID.randomUUID().toString());
}
@Override
public void run() {
ConcurrentHashMap<String, IndexRealTimeWebSocket> webSocketMap =
IndexRealTimeWebSocket.webSocketMap;
ConcurrentHashMap<String, String> deviceMap = IndexRealTimeWebSocket.deviceMap;
//给每个deviceid分组,然后统一查询,进行推送
Map<String, List<Map.Entry<String, String>>> groupMap = deviceMap.entrySet().stream().collect(Collectors.groupingBy(c -> c.getValue()));
//需要区分的测点
String[] points = new String[]{
"发电机定子A相电流",
"发电机定子B相电流",
"发电机定子C相电流",
"发电机定子AB线电压",
"发电机定子BC线电压",
"发电机定子CA线电压",
"汽端座振X",
"汽端轴振X",
"励端座振X",
"励端轴振X",
"排油温度",
};
//向每个webSocket推送系统实时数据
List<CharacterParamInfoVo> characterParamInfoList = taskCacheDataService.cacheIndexCPI();
for (String deviceId : groupMap.keySet()) {
//获取kkscode
List<CharacterParamInfoVo> characterParamInfoVos ;
//全场
if ("total".equals(deviceId)) {
characterParamInfoVos = characterParamInfoList;
}else{
characterParamInfoVos = characterParamInfoList.stream().filter(item -> item.getDeviceId().equals(deviceId)).collect(Collectors.toList());
}
Map<String, String> KKsCodes = characterParamInfoVos.stream().filter(item ->
IS_CONTAINS(item.getCpName(), points))
.collect(Collectors.toMap(CharacterParamInfoVo::getKksCode, CharacterParamInfoVo::getCpName));
//获取最后一个数据点的最近一个时间
Long lastTime = OpentsdbOkHttpClient.queryLast(characterParamInfoVos);
QueryResponse response = iCharacterParamService
.getRealTimeDataByKksCode(new ArrayList<>(KKsCodes.keySet()), (lastTime - 1) + "", lastTime + "", "");
//获取横坐标
List<Object> xAxis = response.getResults().get(0).getDps().keySet().stream().map(item -> item.toString()).collect(Collectors.toList());
//组装数据
Map<String, DataPower> map = new HashMap<>();
for (QueryResult queryResult : response.getResults()) {
DataPower.PowerPoints powerPoints = new DataPower.PowerPoints();
powerPoints.setKksCode(queryResult.getMetric());
powerPoints.setName(KKsCodes.get(queryResult.getMetric()));
List<String> finalList = new ArrayList<>();
queryResult.getDps().values().stream().forEach(item -> {
finalList.add(item.toString());
});
powerPoints.setValue(finalList);
if (IS_CONTAINS(KKsCodes.get(queryResult.getMetric()), new String[]{
"发电机定子A相电流",
"发电机定子B相电流",
"发电机定子C相电流",
"发电机定子AB线电压",
"发电机定子BC线电压",
"发电机定子CA线电压",
})) {
//右上图
DataPower dataPower = map.getOrDefault("rightTop", new DataPower());
dataPower.setXAxis(xAxis);
dataPower.getDataList().add(powerPoints);
map.putIfAbsent("rightTop", dataPower);
}
if (IS_CONTAINS(KKsCodes.get(queryResult.getMetric()), new String[]{
"汽端座振X",
"汽端轴振X",
"励端座振X",
"励端轴振X",
})) {
//右上图
DataPower dataPower = map.getOrDefault("rightMiddle", new DataPower());
dataPower.setXAxis(xAxis);
dataPower.getDataList().add(powerPoints);
map.putIfAbsent("rightMiddle", dataPower);
}
if (IS_CONTAINS(KKsCodes.get(queryResult.getMetric()), new String[]{
"排油温度",
})) {
//右上图
DataPower dataPower = map.getOrDefault("rightBottom", new DataPower());
dataPower.setXAxis(xAxis);
dataPower.getDataList().add(powerPoints);
map.putIfAbsent("rightBottom", dataPower);
}
}
try {
List<Map.Entry<String, String>> deviceGroupMap = groupMap.get(deviceId);
//根据deviceid统一发送
for (Map.Entry<String, String> entry : deviceGroupMap) {
final IndexRealTimeWebSocket webSocket = webSocketMap.get(entry.getKey());
webSocket.sendMessage(JSON.toJSONString(map));
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
package cn.wise.sc.energy.power.plant.business.task;
import cn.wise.sc.energy.power.plant.business.config.MyApplicationContextAware;
import cn.wise.sc.energy.power.plant.business.task.schedule.ScheduleUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
......@@ -14,15 +18,11 @@ import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
/**
* @description: 水系统检测
* @author: qh
* @create: 2020-08-17 16:39
**/
@Slf4j
@Component
@ServerEndpoint("/waterSystem/{deviceId}")
public class WaterSystemWebSocketServer {
@ServerEndpoint("/index/realTimeData/{deviceId}")
@DependsOn("myApplicationContextAware")
public class IndexRealTimeWebSocket {
/**
* 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
......@@ -31,13 +31,18 @@ public class WaterSystemWebSocketServer {
/**
* concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
*/
public static ConcurrentHashMap<String, WaterSystemWebSocketServer> webSocketMap = new ConcurrentHashMap<>();
public static ConcurrentHashMap<String, IndexRealTimeWebSocket> webSocketMap = new ConcurrentHashMap<>();
public static ConcurrentHashMap<String, String> deviceMap = new ConcurrentHashMap<>();
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
/**
* 接收 建立连接的参数信息
*/
private String uuid;
private IndexRealTimeTask indexRealTimeTask = (IndexRealTimeTask) MyApplicationContextAware.getApplicationContext().getBean("indexRealTimeTask");
/**
* 连接建立成功调用的方法
......@@ -46,10 +51,18 @@ public class WaterSystemWebSocketServer {
public void onOpen(Session session, @PathParam("deviceId") String deviceId) {
this.session = session;
uuid = UUID.randomUUID().toString();
//如果是total 表示全场
deviceMap.put(uuid, deviceId);
webSocketMap.put(uuid, this);
//加入set中
addOnlineCount();
//启动任务
ScheduleUtil.start(indexRealTimeTask, 1000);
//已有用户的情况,主动推送一次
if (webSocketMap.size() > 1) {
indexRealTimeTask.run();
}
log.info("用户连接:" + deviceId + ",当前在线人数为:" + getOnlineCount());
}
......@@ -60,9 +73,13 @@ public class WaterSystemWebSocketServer {
public void onClose() {
if (webSocketMap.containsKey(uuid)) {
webSocketMap.remove(uuid);
deviceMap.remove(uuid);
//从set中删除
subOnlineCount();
}
if (webSocketMap.size() == 0) {
ScheduleUtil.cancel(indexRealTimeTask);
}
log.info("用户退出:" + uuid + ",当前在线人数为:" + getOnlineCount());
}
......@@ -82,8 +99,10 @@ public class WaterSystemWebSocketServer {
}
/**
* @param session
* @param error
* 错误
*
* @param session 会话
* @param error 错误异常
*/
@OnError
public void onError(Session session, Throwable error) {
......@@ -101,6 +120,10 @@ public class WaterSystemWebSocketServer {
/**
* 发送自定义消息
*
* @param message 消息
* @param userId 用户
* @throws IOException 异常
*/
public static void sendInfo(String message, @PathParam("userId") String userId) throws IOException {
log.info("发送消息到:" + userId + ",报文:" + message);
......@@ -116,10 +139,10 @@ public class WaterSystemWebSocketServer {
}
public static synchronized void addOnlineCount() {
WaterSystemWebSocketServer.onlineCount++;
IndexRealTimeWebSocket.onlineCount++;
}
public static synchronized void subOnlineCount() {
WaterSystemWebSocketServer.onlineCount--;
IndexRealTimeWebSocket.onlineCount--;
}
}
......@@ -3,6 +3,7 @@ package cn.wise.sc.energy.power.plant.business.task;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnError;
......@@ -22,8 +23,9 @@ import java.util.concurrent.ConcurrentHashMap;
**/
@Slf4j
@Component
@DependsOn("myApplicationContextAware")
@ServerEndpoint("/index/{plantCode}/{deviceId}")
public class IndexSystemSocket {
public class IndexSystemWebSocket {
/**
* 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
......@@ -32,7 +34,7 @@ public class IndexSystemSocket {
/**
* concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
*/
public static ConcurrentHashMap<String, IndexSystemSocket> webSocketMap = new ConcurrentHashMap<>();
public static ConcurrentHashMap<String, IndexSystemWebSocket> webSocketMap = new ConcurrentHashMap<>();
public static ConcurrentHashMap<String, String> deviceMap = new ConcurrentHashMap<>();
public static ConcurrentHashMap<String, String> plantCodeMap = new ConcurrentHashMap<>();
/**
......@@ -60,7 +62,7 @@ public class IndexSystemSocket {
}
/**
* 连接关闭调用的方法
* 连接关0---闭调用的方法
*/
@OnClose
public void onClose() {
......@@ -122,10 +124,10 @@ public class IndexSystemSocket {
}
public static synchronized void addOnlineCount() {
IndexSystemSocket.onlineCount++;
IndexSystemWebSocket.onlineCount++;
}
public static synchronized void subOnlineCount() {
IndexSystemSocket.onlineCount--;
IndexSystemWebSocket.onlineCount--;
}
}
package cn.wise.sc.energy.power.plant.business.task;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.wise.sc.energy.power.plant.business.bean.DataPower;
import cn.wise.sc.energy.power.plant.business.domain.vo.CharacterParamInfoVo;
import cn.wise.sc.energy.power.plant.business.opentsdb.OpentsdbOkHttpClient;
import cn.wise.sc.energy.power.plant.business.service.ICharacterParamService;
import cn.wise.sc.energy.power.plant.business.service.IEventInfoService;
import cn.wise.sc.energy.power.plant.business.task.schedule.ScheduleTask;
import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import net.opentsdb.client.api.query.response.QueryResponse;
import net.opentsdb.client.bean.QueryResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* @author neo.shu
* @since 2020/9/17 17:30
*/
@Service(value = "oilSystem1Task")
public class OilSystem1Task extends ScheduleTask {
@Autowired
ICharacterParamService iCharacterParamService;
@Autowired
IEventInfoService iEventInfoService;
@Autowired
TaskCacheDataService taskCacheDataService;
public OilSystem1Task() {
super(UUID.randomUUID().toString());
}
@Override
public void run() {
ConcurrentHashMap<String, OilSystemWebSocket> webSocketMap =
OilSystemWebSocket.webSocketMap;
ConcurrentHashMap<String, String> deviceMap = OilSystemWebSocket.deviceMap;
//给每个deviceid分组,然后统一查询,进行推送
Map<String, List<Map.Entry<String, String>>> groupMap = deviceMap.entrySet().stream().collect(Collectors.groupingBy(c -> c.getValue()));
//需要区分的测点
String[] points = new String[]{
//左上
"汽轮机备用油差压阀进口压力",
"排油烟机1号进口压力",
"排油烟机2号进口压力",
};
//向每个webSocket推送系统实时数据
for (String deviceId : groupMap.keySet()) {
List<CharacterParamInfoVo> characterParamInfoVos = taskCacheDataService.cacheOilCPI(deviceId);
//获取kkscode
Map<String, String> KKsCodes = characterParamInfoVos.stream().filter(item ->
IS_CONTAINS(item.getCpName(), points))
.collect(Collectors.toMap(CharacterParamInfoVo::getKksCode, CharacterParamInfoVo::getCpName));
//获取最后一个数据点的最近一个时间
Long lastTime = OpentsdbOkHttpClient.queryLast(characterParamInfoVos);
QueryResponse response = iCharacterParamService
.getRealTimeDataByKksCode(new ArrayList<>(KKsCodes.keySet()), (lastTime - 24 * 3600 * 1000) + "", lastTime + "", "1h-avg-zero");
//获取横坐标
List<Object> xAxis = response.getResults().get(0).getDps().keySet().stream().map(item -> item.toString()).collect(Collectors.toList());
//组装数据
Map<String, DataPower> map = new HashMap<>();
for (QueryResult queryResult : response.getResults()) {
DataPower.PowerPoints powerPoints = new DataPower.PowerPoints();
powerPoints.setKksCode(queryResult.getMetric());
powerPoints.setName(KKsCodes.get(queryResult.getMetric()));
List<String> finalList = new ArrayList<>();
queryResult.getDps().values().stream().forEach(item -> {
finalList.add(item.toString());
});
powerPoints.setValue(finalList);
if (IS_CONTAINS(KKsCodes.get(queryResult.getMetric()), points)) {
//左上图
DataPower dataPower = map.getOrDefault("leftTop", new DataPower());
dataPower.setXAxis(xAxis);
dataPower.getDataList().add(powerPoints);
map.putIfAbsent("leftTop", dataPower);
}
}
try {
List<Map.Entry<String, String>> deviceGroupMap = groupMap.get(deviceId);
//根据deviceid统一发送
for (Map.Entry<String, String> entry : deviceGroupMap) {
final OilSystemWebSocket webSocket = webSocketMap.get(entry.getKey());
webSocket.sendMessage(JSON.toJSONString(map));
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
package cn.wise.sc.energy.power.plant.business.task;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.wise.sc.energy.power.plant.business.bean.DataPower;
import cn.wise.sc.energy.power.plant.business.domain.vo.CharacterParamInfoVo;
import cn.wise.sc.energy.power.plant.business.opentsdb.OpentsdbOkHttpClient;
import cn.wise.sc.energy.power.plant.business.service.ICharacterParamService;
import cn.wise.sc.energy.power.plant.business.service.IEventInfoService;
import cn.wise.sc.energy.power.plant.business.task.schedule.ScheduleTask;
import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import net.opentsdb.client.api.query.response.QueryResponse;
import net.opentsdb.client.bean.QueryResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* @author neo.shu
* @since 2020/9/17 17:30
*/
@Service(value = "oilSystem3Task")
public class OilSystem3Task extends ScheduleTask {
@Autowired
ICharacterParamService iCharacterParamService;
@Autowired
IEventInfoService iEventInfoService;
@Autowired
TaskCacheDataService taskCacheDataService;
public OilSystem3Task() {
super(UUID.randomUUID().toString());
}
@Override
public void run() {
ConcurrentHashMap<String, OilSystemWebSocket> webSocketMap =
OilSystemWebSocket.webSocketMap;
ConcurrentHashMap<String, String> deviceMap = OilSystemWebSocket.deviceMap;
//给每个deviceid分组,然后统一查询,进行推送
Map<String, List<Map.Entry<String, String>>> groupMap = deviceMap.entrySet().stream().collect(Collectors.groupingBy(c -> c.getValue()));
//需要区分的测点
String[] lefttop_points = new String[]{
//左上
"空侧交流密封油泵进出口压差",
"空侧直流密封油泵进出口压差",
"密封油油氢压差",
"密封油油氢压差",
"氢侧交流油泵进出口压差",
"氢侧直流油泵进出口压差",
};
String[] leftbottom_points = new String[]{
//左下
"励端轴承氢侧密封油温度",
"汽端轴承氢侧密封油温度",
"空侧冷却器密封油出口温度",
};
//向每个webSocket推送系统实时数据
for (String deviceId : groupMap.keySet()) {
List<CharacterParamInfoVo> characterParamInfoVos = taskCacheDataService.cacheOilCPI(deviceId);
//获取kkscode
Map<String, String> KKsCodes = characterParamInfoVos.stream().filter(item ->
IS_CONTAINS(item.getCpName(), ArrayUtil.addAll(lefttop_points, leftbottom_points)))
.collect(Collectors.toMap(CharacterParamInfoVo::getKksCode, CharacterParamInfoVo::getCpName));
//获取最后一个数据点的最近一个时间
Long lastTime = OpentsdbOkHttpClient.queryLast(characterParamInfoVos);
QueryResponse response = iCharacterParamService
.getRealTimeDataByKksCode(new ArrayList<>(KKsCodes.keySet()), (lastTime - 1) + "", lastTime + "", "");
//获取横坐标
List<Object> xAxis = response.getResults().get(0).getDps().keySet().stream().map(item -> item.toString()).collect(Collectors.toList());
//组装数据
Map<String, DataPower> map = new HashMap<>();
for (QueryResult queryResult : response.getResults()) {
DataPower.PowerPoints powerPoints = new DataPower.PowerPoints();
powerPoints.setKksCode(queryResult.getMetric());
powerPoints.setName(KKsCodes.get(queryResult.getMetric()));
List<String> finalList = new ArrayList<>();
queryResult.getDps().values().stream().forEach(item -> {
finalList.add(item.toString());
});
powerPoints.setValue(finalList);
if (IS_CONTAINS(KKsCodes.get(queryResult.getMetric()), lefttop_points)) {
//左上图
DataPower dataPower = map.getOrDefault("leftTop", new DataPower());
dataPower.setXAxis(xAxis);
dataPower.getDataList().add(powerPoints);
map.putIfAbsent("leftTop", dataPower);
}
if (IS_CONTAINS(KKsCodes.get(queryResult.getMetric()), leftbottom_points)) {
DataPower dataPower = map.getOrDefault("leftBottom", new DataPower());
dataPower.setXAxis(xAxis);
dataPower.getDataList().add(powerPoints);
map.putIfAbsent("leftBottom", dataPower);
}
}
try {
List<Map.Entry<String, String>> deviceGroupMap = groupMap.get(deviceId);
//根据deviceid统一发送
for (Map.Entry<String, String> entry : deviceGroupMap) {
final OilSystemWebSocket webSocket = webSocketMap.get(entry.getKey());
webSocket.sendMessage(JSON.toJSONString(map));
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Data
@AllArgsConstructor
static class StudentInfo {
private String name;
}
public static void main(String[] args) {
List<StudentInfo> studentList = new ArrayList<>();
studentList.add(new StudentInfo("李小明"));
studentList.add(new StudentInfo("张小丽"));
studentList.add(new StudentInfo("王大朋"));
studentList.add(new StudentInfo("陈小跑"));
List<StudentInfo> a = studentList.stream().filter(item -> isContains(item.getName(), new String[]{"李", "小"})).collect(Collectors.toList());
System.out.println();
}
static boolean isContains(String compareWords, String... args) {
if (args.length == 0) {
return false;
}
return CollectionUtil.toList(args).stream().map(item -> compareWords.contains(item)).reduce((ei, ej) -> ei || ej).get();
}
}
package cn.wise.sc.energy.power.plant.business.task;
import cn.wise.sc.energy.power.plant.business.config.MyApplicationContextAware;
import cn.wise.sc.energy.power.plant.business.task.schedule.ScheduleUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnError;
......@@ -22,7 +25,8 @@ import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Component
@ServerEndpoint("/oilSystem/{deviceId}")
public class OilSystemWebSocketServer {
@DependsOn("myApplicationContextAware")
public class OilSystemWebSocket {
/**
* 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
*/
......@@ -30,7 +34,7 @@ public class OilSystemWebSocketServer {
/**
* concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
*/
public static ConcurrentHashMap<String, OilSystemWebSocketServer> webSocketMap = new ConcurrentHashMap<>();
public static ConcurrentHashMap<String, OilSystemWebSocket> webSocketMap = new ConcurrentHashMap<>();
public static ConcurrentHashMap<String, String> deviceMap = new ConcurrentHashMap<>();
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
......@@ -38,6 +42,9 @@ public class OilSystemWebSocketServer {
private Session session;
private String uuid;
private OilSystem3Task oilSystem3Task = (OilSystem3Task) MyApplicationContextAware.getApplicationContext().getBean("oilSystem3Task");
private OilSystem1Task oilSystem1Task = (OilSystem1Task) MyApplicationContextAware.getApplicationContext().getBean("oilSystem1Task");
/**
* 连接建立成功调用的方法
*/
......@@ -49,6 +56,14 @@ public class OilSystemWebSocketServer {
webSocketMap.put(uuid, this);
//加入set中
addOnlineCount();
//启动任务
ScheduleUtil.start(oilSystem3Task,1000);
ScheduleUtil.start(oilSystem1Task,1000*3600);
//已有用户的情况,主动推送一次
if (webSocketMap.size() > 1) {
oilSystem3Task.run();
oilSystem1Task.run();
}
log.info("用户连接:" + deviceId + ",当前在线人数为:" + getOnlineCount());
}
......@@ -59,9 +74,14 @@ public class OilSystemWebSocketServer {
public void onClose() {
if (webSocketMap.containsKey(uuid)) {
webSocketMap.remove(uuid);
deviceMap.remove(uuid);
//从set中删除
subOnlineCount();
}
if (webSocketMap.size() == 0) {
ScheduleUtil.cancel(oilSystem3Task);
ScheduleUtil.cancel(oilSystem1Task);
}
log.info("用户退出:" + uuid + ",当前在线人数为:" + getOnlineCount());
}
......@@ -81,8 +101,9 @@ public class OilSystemWebSocketServer {
}
/**
* @param session
* @param error
* 错误
* @param session 会话
* @param error 错误异常
*/
@OnError
public void onError(Session session, Throwable error) {
......@@ -100,6 +121,9 @@ public class OilSystemWebSocketServer {
/**
* 发送自定义消息
* @param message 消息
* @param userId 用户
* @throws IOException 异常
*/
public static void sendInfo(String message, @PathParam("userId") String userId) throws IOException {
log.info("发送消息到:" + userId + ",报文:" + message);
......@@ -115,10 +139,10 @@ public class OilSystemWebSocketServer {
}
public static synchronized void addOnlineCount() {
OilSystemWebSocketServer.onlineCount++;
OilSystemWebSocket.onlineCount++;
}
public static synchronized void subOnlineCount() {
OilSystemWebSocketServer.onlineCount--;
OilSystemWebSocket.onlineCount--;
}
}
package cn.wise.sc.energy.power.plant.business.task;
import cn.wise.sc.energy.power.plant.business.domain.vo.CharacterParamInfoVo;
import cn.wise.sc.energy.power.plant.business.service.ICharacterParamService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
/**
* @author neo.shu
* @since 2020/9/17 17:36
*/
@Service
public class TaskCacheDataService {
@Autowired
ICharacterParamService iCharacterParamService;
@Cacheable(value = "hydroge-character", key = "#deviceId")
public List<CharacterParamInfoVo> cacheHydrogeCha(String deviceId) {
List<String> characterNames = new ArrayList<>();
characterNames.add("发电机机内氢压");
characterNames.add("机内氢气湿度");
characterNames.add("漏氢含量");
characterNames.add("汽端发电机冷氢温度");
List<CharacterParamInfoVo> characterParamInfoVos =
iCharacterParamService.getCharacterByName(characterNames, "", deviceId);
return characterParamInfoVos;
}
@Cacheable(value = "oil-character", key = "#deviceId")
public List<CharacterParamInfoVo> cacheOilCPI(String deviceId) {
List<String> characterNames = new ArrayList<>();
//左上角
characterNames.add("空侧交流密封油泵进出口压差");
characterNames.add("空侧直流密封油泵进出口压差");
characterNames.add("密封油油氢压差");
characterNames.add("密封油油氢压差");
characterNames.add("氢侧交流油泵进出口压差");
characterNames.add("氢侧直流油泵进出口压差");
//左下角
characterNames.add("励端轴承氢侧密封油温度");
characterNames.add("汽端轴承氢侧密封油温度");
characterNames.add("空侧冷却器密封油出口温度");
characterNames.add("氢侧冷却器密封油出口温度");
//右上角
characterNames.add("汽轮机备用油差压阀进口压力");
characterNames.add("排油烟机1号进口压力");
characterNames.add("排油烟机2号进口压力");
List<CharacterParamInfoVo> characterParamInfoVos =
iCharacterParamService.getCharacterByName(characterNames, "", deviceId);
return characterParamInfoVos;
}
@Cacheable(value = "index-character")
public List<CharacterParamInfoVo> cacheIndexCPI() {
List<String> characterNames = new ArrayList<>();
//右上
characterNames.add("发电机定子A相电流");
characterNames.add("发电机定子B相电流");
characterNames.add("发电机定子C相电流");
characterNames.add("发电机定子AB线电压");
characterNames.add("发电机定子BC线电压");
characterNames.add("发电机定子CA线电压");
//右中
characterNames.add("汽端座振X");
characterNames.add("汽端轴振X");
characterNames.add("励端座振X");
characterNames.add("励端轴振X");
//右下
characterNames.add("排油温度");
List<CharacterParamInfoVo> characterParamInfoVos =
iCharacterParamService.getCharacterByName(characterNames, "", "");
return characterParamInfoVos;
}
@Cacheable(value = "water-character", key = "#deviceId")
public List<CharacterParamInfoVo> cacheWaterCPI(String deviceId) {
List<String> characterNames = new ArrayList<>();
//左下 小时
characterNames.add("转子冷却水流量");
characterNames.add("定子冷却水流量");
//左上
characterNames.add("转子冷却水泵A出口压力");
characterNames.add("转子冷却水泵B出口压力");
//右上
characterNames.add("定子线圈进水温度");
characterNames.add("定子线圈出水温度");
characterNames.add("转子线圈进水温度");
characterNames.add("转子线圈出水温度");
//右下
characterNames.add("定子线圈进水电导率");
characterNames.add("定子线圈进水pH值");
characterNames.add("转子线圈进水电导率");
characterNames.add("转子线圈进水pH值");
List<CharacterParamInfoVo> characterParamInfoVos =
iCharacterParamService.getCharacterByName(characterNames, "", deviceId);
return characterParamInfoVos;
}
}
package cn.wise.sc.energy.power.plant.business.task;
import cn.hutool.core.collection.CollectionUtil;
import cn.wise.sc.energy.power.plant.business.bean.DataPower;
import cn.wise.sc.energy.power.plant.business.domain.vo.CharacterParamInfoVo;
import cn.wise.sc.energy.power.plant.business.opentsdb.OpentsdbOkHttpClient;
import cn.wise.sc.energy.power.plant.business.service.ICharacterParamService;
import cn.wise.sc.energy.power.plant.business.service.IEventInfoService;
import cn.wise.sc.energy.power.plant.business.task.schedule.ScheduleTask;
import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import net.opentsdb.client.api.query.response.QueryResponse;
import net.opentsdb.client.bean.QueryResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* @author neo.shu
* @since 2020/9/17 17:30
*/
@Service(value = "waterSystem1Task")
public class WaterSystem1Task extends ScheduleTask {
@Autowired
ICharacterParamService iCharacterParamService;
@Autowired
IEventInfoService iEventInfoService;
@Autowired
TaskCacheDataService taskCacheDataService;
public WaterSystem1Task() {
super(UUID.randomUUID().toString());
}
@Override
public void run() {
ConcurrentHashMap<String, WaterSystemWebSocket> webSocketMap =
WaterSystemWebSocket.webSocketMap;
ConcurrentHashMap<String, String> deviceMap = WaterSystemWebSocket.deviceMap;
//给每个deviceid分组,然后统一查询,进行推送
Map<String, List<Map.Entry<String, String>>> groupMap = deviceMap.entrySet().stream().collect(Collectors.groupingBy(c -> c.getValue()));
//需要区分的测点
String[] points = new String[]{
//左下
"转子冷却水流量",
"定子冷却水流量",
};
//向每个webSocket推送系统实时数据
for (String deviceId : groupMap.keySet()) {
List<CharacterParamInfoVo> characterParamInfoVos = taskCacheDataService.cacheWaterCPI(deviceId);
//获取kkscode
Map<String, String> KKsCodes = characterParamInfoVos.stream().filter(item ->
IS_CONTAINS(item.getCpName(),points))
.collect(Collectors.toMap(CharacterParamInfoVo::getKksCode, CharacterParamInfoVo::getCpName));
//获取最后一个数据点的最近一个时间
Long lastTime = OpentsdbOkHttpClient.queryLast(characterParamInfoVos);
QueryResponse response = iCharacterParamService
.getRealTimeDataByKksCode(new ArrayList<>(KKsCodes.keySet()), (lastTime - 24 * 3600 * 1000) + "", lastTime + "", "1h-avg-zero");
//获取横坐标
List<Object> xAxis = response.getResults().get(0).getDps().keySet().stream().map(item -> item.toString()).collect(Collectors.toList());
//组装数据
Map<String, DataPower> map = new HashMap<>();
for (QueryResult queryResult : response.getResults()) {
DataPower.PowerPoints powerPoints = new DataPower.PowerPoints();
powerPoints.setKksCode(queryResult.getMetric());
powerPoints.setName(KKsCodes.get(queryResult.getMetric()));
List<String> finalList = new ArrayList<>();
queryResult.getDps().values().stream().forEach(item -> {
finalList.add(item.toString());
});
powerPoints.setValue(finalList);
if (IS_CONTAINS(KKsCodes.get(queryResult.getMetric()), new String[]{
"转子冷却水流量",
"定子冷却水流量",
})) {
//左下图
DataPower dataPower = map.getOrDefault("leftBottom", new DataPower());
dataPower.setXAxis(xAxis);
dataPower.getDataList().add(powerPoints);
map.putIfAbsent("leftBottom", dataPower);
}
}
try {
List<Map.Entry<String, String>> deviceGroupMap = groupMap.get(deviceId);
//根据deviceid统一发送
for (Map.Entry<String, String> entry : deviceGroupMap) {
final WaterSystemWebSocket webSocket = webSocketMap.get(entry.getKey());
webSocket.sendMessage(JSON.toJSONString(map));
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
package cn.wise.sc.energy.power.plant.business.task;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.wise.sc.energy.power.plant.business.bean.DataPower;
import cn.wise.sc.energy.power.plant.business.domain.vo.CharacterParamInfoVo;
import cn.wise.sc.energy.power.plant.business.opentsdb.OpentsdbOkHttpClient;
import cn.wise.sc.energy.power.plant.business.service.ICharacterParamService;
import cn.wise.sc.energy.power.plant.business.service.IEventInfoService;
import cn.wise.sc.energy.power.plant.business.task.schedule.ScheduleTask;
import com.alibaba.fastjson.JSON;
import javafx.scene.shape.VLineTo;
import lombok.AllArgsConstructor;
import lombok.Data;
import net.opentsdb.client.api.query.response.QueryResponse;
import net.opentsdb.client.bean.QueryResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* @author neo.shu
* @since 2020/9/17 17:30
*/
@Service(value = "waterSystem3Task")
public class WaterSystem3Task extends ScheduleTask {
@Autowired
ICharacterParamService iCharacterParamService;
@Autowired
IEventInfoService iEventInfoService;
@Autowired
TaskCacheDataService taskCacheDataService;
public WaterSystem3Task() {
super(UUID.randomUUID().toString());
}
@Override
public void run() {
ConcurrentHashMap<String, WaterSystemWebSocket> webSocketMap =
WaterSystemWebSocket.webSocketMap;
ConcurrentHashMap<String, String> deviceMap = WaterSystemWebSocket.deviceMap;
//给每个deviceid分组,然后统一查询,进行推送
Map<String, List<Map.Entry<String, String>>> groupMap = deviceMap.entrySet().stream().collect(Collectors.groupingBy(c -> c.getValue()));
//需要区分的测点
String[] points = new String[]{
//左上
"转子冷却水泵A出口压力",
"转子冷却水泵B出口压力",
//右上
"定子线圈进水温度",
"定子线圈出水温度",
"转子线圈进水温度",
"转子线圈出水温度",
//右下
"定子线圈进水电导率",
"定子线圈进水pH值",
"转子线圈进水电导率",
"转子线圈进水pH值",
};
//向每个webSocket推送系统实时数据
for (String deviceId : groupMap.keySet()) {
List<CharacterParamInfoVo> characterParamInfoVos = taskCacheDataService.cacheWaterCPI(deviceId);
//获取kkscode
Map<String, String> KKsCodes = characterParamInfoVos.stream().filter(item ->
IS_CONTAINS(item.getCpName(),points))
.collect(Collectors.toMap(CharacterParamInfoVo::getKksCode, CharacterParamInfoVo::getCpName));
//获取最后一个数据点的最近一个时间
Long lastTime = OpentsdbOkHttpClient.queryLast(characterParamInfoVos);
QueryResponse response = iCharacterParamService
.getRealTimeDataByKksCode(new ArrayList<>(KKsCodes.keySet()), (lastTime - 1) + "", lastTime + "", "");
//获取横坐标
List<Object> xAxis = response.getResults().get(0).getDps().keySet().stream().map(item -> item.toString()).collect(Collectors.toList());
//组装数据
Map<String, DataPower> map = new HashMap<>();
for (QueryResult queryResult : response.getResults()) {
DataPower.PowerPoints powerPoints = new DataPower.PowerPoints();
powerPoints.setKksCode(queryResult.getMetric());
powerPoints.setName(KKsCodes.get(queryResult.getMetric()));
List<String> finalList = new ArrayList<>();
queryResult.getDps().values().stream().forEach(item -> {
finalList.add(item.toString());
});
powerPoints.setValue(finalList);
if (IS_CONTAINS(KKsCodes.get(queryResult.getMetric()), new String[]{
"转子冷却水泵A出口压力",
"转子冷却水泵B出口压力",
})) {
//左上图
DataPower dataPower = map.getOrDefault("leftTop", new DataPower());
dataPower.setXAxis(xAxis);
dataPower.getDataList().add(powerPoints);
map.putIfAbsent("leftTop", dataPower);
}
if (IS_CONTAINS(KKsCodes.get(queryResult.getMetric()), new String[]{
"定子线圈进水电导率",
"定子线圈进水pH值",
"转子线圈进水电导率",
"转子线圈进水pH值",
})) {
DataPower dataPower = map.getOrDefault("rightBottom", new DataPower());
dataPower.setXAxis(xAxis);
dataPower.getDataList().add(powerPoints);
map.putIfAbsent("rightBottom", dataPower);
}
if (IS_CONTAINS(KKsCodes.get(queryResult.getMetric()), new String[]{
//右上
"定子线圈进水温度",
"定子线圈出水温度",
"转子线圈进水温度",
"转子线圈出水温度",
})) {
DataPower dataPower = map.getOrDefault("rightTop", new DataPower());
dataPower.setXAxis(xAxis);
dataPower.getDataList().add(powerPoints);
map.putIfAbsent("rightTop", dataPower);
}
}
try {
List<Map.Entry<String, String>> deviceGroupMap = groupMap.get(deviceId);
//根据deviceid统一发送
for (Map.Entry<String, String> entry : deviceGroupMap) {
final WaterSystemWebSocket webSocket = webSocketMap.get(entry.getKey());
webSocket.sendMessage(JSON.toJSONString(map));
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
package cn.wise.sc.energy.power.plant.business.task;
import cn.wise.sc.energy.power.plant.business.config.MyApplicationContextAware;
import cn.wise.sc.energy.power.plant.business.task.schedule.ScheduleUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
/**
* @description: 水系统检测
* @author: qh
* @create: 2020-08-17 16:39
**/
@Slf4j
@Component
@ServerEndpoint("/waterSystem/{deviceId}")
@DependsOn("myApplicationContextAware")
public class WaterSystemWebSocket {
/**
* 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
*/
private static int onlineCount = 0;
/**
* concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
*/
public static ConcurrentHashMap<String, WaterSystemWebSocket> webSocketMap = new ConcurrentHashMap<>();
public static ConcurrentHashMap<String, String> deviceMap = new ConcurrentHashMap<>();
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
private String uuid;
private WaterSystem3Task waterSystem3Task = (WaterSystem3Task) MyApplicationContextAware.getApplicationContext().getBean("waterSystem3Task");
private WaterSystem1Task waterSystem1Task = (WaterSystem1Task) MyApplicationContextAware.getApplicationContext().getBean("waterSystem1Task");
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("deviceId") String deviceId) {
this.session = session;
uuid = UUID.randomUUID().toString();
deviceMap.put(uuid, deviceId);
webSocketMap.put(uuid, this);
//加入set中
addOnlineCount();
//启动任务
ScheduleUtil.start(waterSystem3Task,1000);
ScheduleUtil.start(waterSystem1Task,1000*3600);
//已有用户的情况,主动推送一次
if (webSocketMap.size() > 1) {
waterSystem3Task.run();
waterSystem1Task.run();
}
log.info("用户连接:" + deviceId + ",当前在线人数为:" + getOnlineCount());
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
if (webSocketMap.containsKey(uuid)) {
webSocketMap.remove(uuid);
deviceMap.remove(uuid);
//从set中删除
subOnlineCount();
}
if (webSocketMap.size() == 0) {
ScheduleUtil.cancel(waterSystem3Task);
ScheduleUtil.cancel(waterSystem1Task);
}
log.info("用户退出:" + uuid + ",当前在线人数为:" + getOnlineCount());
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) throws IOException {
log.info("用户消息:" + uuid + ",报文:" + message);
//可以群发消息
//消息保存到数据库、redis
if (StringUtils.isNotBlank(message)) {
webSocketMap.get(uuid).sendMessage("111111111");
}
}
/**
* 错误
* @param session 会话
* @param error 错误异常
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("用户错误:" + this.uuid + ",原因:" + error.getMessage());
error.printStackTrace();
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
/**
* 发送自定义消息
* @param message 消息
* @param userId 用户
* @throws IOException 异常
*/
public static void sendInfo(String message, @PathParam("userId") String userId) throws IOException {
log.info("发送消息到:" + userId + ",报文:" + message);
if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) {
webSocketMap.get(userId).sendMessage(message);
} else {
log.error("用户" + userId + ",不在线!");
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WaterSystemWebSocket.onlineCount++;
}
public static synchronized void subOnlineCount() { WaterSystemWebSocket.onlineCount--; }
}
package cn.wise.sc.energy.power.plant.business.task;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Component
@ServerEndpoint("/index/realTimeData/{plantCode}")
public class WebSocketServer {
/**
* 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
*/
private static int onlineCount = 0;
/**
* concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
*/
public static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
public static ConcurrentHashMap<String, String> plantCodeMap = new ConcurrentHashMap<>();
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
/**
* 接收 建立连接的参数信息
*/
private String plantCode = "";
private String uuid;
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("plantCode") String plantCode) {
this.session = session;
this.plantCode = plantCode;
String uid = UUID.randomUUID().toString();
uuid = uid;
plantCodeMap.put(uid, plantCode);
// if (webSocketMap.containsKey(plantCode)) {
// webSocketMap.remove(plantCode);
// webSocketMap.put(plantCode, this);
// //加入set中
// } else {
webSocketMap.put(uuid, this);
//加入set中
addOnlineCount();
//在线数加1
// }
log.info("用户连接:" + plantCode + ",当前在线人数为:" + getOnlineCount());
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
if (webSocketMap.containsKey(uuid)) {
webSocketMap.remove(uuid);
//从set中删除
subOnlineCount();
}
log.info("用户退出:" + uuid + ",当前在线人数为:" + getOnlineCount());
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) throws IOException {
log.info("用户消息:" + uuid + ",报文:" + message);
//可以群发消息
//消息保存到数据库、redis
if (StringUtils.isNotBlank(message)) {
webSocketMap.get(uuid).sendMessage("111111111");
}
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("用户错误:" + this.uuid + ",原因:" + error.getMessage());
error.printStackTrace();
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
/**
* 发送自定义消息
*/
public static void sendInfo(String message, @PathParam("userId") String userId) throws IOException {
log.info("发送消息到:" + userId + ",报文:" + message);
if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) {
webSocketMap.get(userId).sendMessage(message);
} else {
log.error("用户" + userId + ",不在线!");
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
}
package cn.wise.sc.energy.power.plant.business.task.schedule;
import cn.hutool.core.collection.CollectionUtil;
public abstract class ScheduleTask implements Runnable {
private String id;
public void setId(String id) {
this.id = id;
}
public String getId() {
return id;
}
public ScheduleTask(String id) {
this.id = id;
}
@Override
public abstract void run();
public static boolean IS_CONTAINS(String compareWords, String... args) {
if (args.length == 0) {
return false;
}
//map reduce 判断是否包含给定的列表
return CollectionUtil.toList(args).stream().map(item -> compareWords.contains(item)).reduce((ei, ej) -> ei || ej).get(); }
}
\ No newline at end of file
package cn.wise.sc.energy.power.plant.business.task.schedule;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
/**
* 定时任务工具类
*
*/
public class ScheduleUtil {
private static final Log logger = LogFactory.getLog(ScheduleUtil.class);
private static ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
private static Map<String, ScheduledFuture<?>> scheduledFutureMap = new HashMap<>();
static{
threadPoolTaskScheduler.initialize();
logger.info("初始化线程池...");
}
/**
* 启动某定时任务,到时间点就运行一次
* @param scheduleTask
* @param startTime
*/
public static void start(ScheduleTask scheduleTask, Date startTime){
if (isExist(scheduleTask.getId())){
logger.info("启动定时任务"+ scheduleTask.getId()+"失败,任务已存在");
return;
}
ScheduledFuture<?>scheduledFuture = threadPoolTaskScheduler.schedule(scheduleTask,startTime);
scheduledFutureMap.put(scheduleTask.getId(),scheduledFuture);
logger.info("启动定时任务"+ scheduleTask.getId()+",执行时间为"+ startTime);
}
/**
* 启动某定时任务,以固定周期运行
* @param scheduleTask
* @param period
*/
public static void start(ScheduleTask scheduleTask, long period){
if (isExist(scheduleTask.getId())){
logger.info("启动定时任务"+ scheduleTask.getId()+"失败,任务已存在");
return;
}
ScheduledFuture<?>scheduledFuture = threadPoolTaskScheduler.scheduleAtFixedRate(scheduleTask,period);
scheduledFutureMap.put(scheduleTask.getId(),scheduledFuture);
logger.info("启动定时任务" + scheduleTask.getId() + ",执行周期为" + period + "毫秒");
}
/**
* 取消某定时任务
* @param scheduleTask*/
public static void cancel(ScheduleTask scheduleTask){
ScheduledFuture<?> scheduledFuture = scheduledFutureMap.get(scheduleTask.getId());
if(scheduledFuture != null && !scheduledFuture.isCancelled()){
scheduledFuture.cancel(false);
}
scheduledFutureMap.remove(scheduleTask.getId());
logger.info("取消定时任务"+ scheduleTask.getId());
}
/**
* 修改定时任务执行时间
* @param scheduleTask
* @param startTime
*/
public static void reset(ScheduleTask scheduleTask,Date startTime){
//先取消定时任务
String id = scheduleTask.getId();
ScheduledFuture<?> scheduledFuture = scheduledFutureMap.get(id);
if(scheduledFuture != null && !scheduledFuture.isCancelled()){
scheduledFuture.cancel(false);
}
scheduledFutureMap.remove(id);
//然后启动新的定时任务
scheduledFuture = threadPoolTaskScheduler.schedule(scheduleTask,startTime);
scheduledFutureMap.put(id,scheduledFuture);
logger.info("重置定时任务"+ id+",执行时间为"+ startTime);
}
/**
* 判断某个定时任务是否存在或已经取消
* @param id
*/
public static Boolean isExist(String id) {
ScheduledFuture<?> scheduledFuture = scheduledFutureMap.get(id);
if (scheduledFuture != null && !scheduledFuture.isCancelled()) {
return true;
}
return false;
}
}
\ No newline at end of file
......@@ -4,6 +4,7 @@ package cn.wise.sc.energy.power.plant.business.utils.dfs;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.file.FileReader;
import com.aspose.words.Document;
import com.aspose.words.SaveOptions;
import org.apache.commons.io.FilenameUtils;
import org.csource.common.MyException;
import org.csource.common.NameValuePair;
......
......@@ -26,11 +26,11 @@
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<!-- <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment