Commit 7d626c3e authored by shulidong's avatar shulidong

添加趋势socket,bug修复

parent 70511567
......@@ -219,6 +219,24 @@
<scope>system</scope> <!--system,类似provided,需要显式提供依赖的jar以后,Maven就不会在Repository中查找它-->
<systemPath>${basedir}/lib/words.jar</systemPath> <!--项目根目录下的lib文件夹下-->
</dependency>
<!--easypoi-->
<dependency>
<groupId>cn.afterturn</groupId>
<artifactId>easypoi-base</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>org.jfree</groupId>
<artifactId>jcommon</artifactId>
<version>1.0.24</version>
</dependency>
<dependency>
<groupId>org.jfree</groupId>
<artifactId>jfreechart</artifactId>
<version>1.5.0</version>
</dependency>
<!--easypoi-->
</dependencies>
<build>
<plugins>
......
......@@ -58,6 +58,7 @@ public class DataPower implements Serializable {
* Y轴值集合
*/
private List<String> value = new ArrayList<>();
private List<String> singleType = new ArrayList<>();
}
}
......@@ -226,14 +226,13 @@ public class OpentsdbOkHttpClient {
final Long[] latestTime = {0L};
if (!CollectionUtils.isEmpty(results)) {
results.stream().forEach(result -> {
timeList.add(result.getTimestamp());
if (latestTime[0] < result.getTimestamp()) {
latestTime[0] = result.getTimestamp();
}
});
return timeList.get(0);
return latestTime[0];
} else {
return 0L;
return System.currentTimeMillis();
}
} catch (IOException e) {
e.printStackTrace();
......
......@@ -71,6 +71,8 @@ public interface ICharacterParamService extends IBaseService<String, CharacterPa
QueryResponse getRealTimeDataByKksCode(List<String> kksCodes, String start, String end, String downSample);
QueryResponse getRealTimeDataByKksCode(Map<String, String> kksCodes, String start, String end, String downSample);
/**
* 根据kksCode获取功率实时数据 4小时一统计
*
......
......@@ -242,7 +242,50 @@ public class CharacterParamServiceImpl extends BaseServiceImpl<String, Character
//获取最后一条数据 然后根据这个查询
for (CharacterParamInfo characterParamInfo : characterParamInfos) {
QueryExt build = QueryExt.builder()
.aggregator(Aggregator.AVG)
.aggregator(Aggregator.NONE)
.metric(characterParamInfo.getKksCode())
.build();
if (StrUtil.isNotBlank(downSample)) {
build.setDownsample(downSample);
}
queryRequestExt.addQuery(build);
}
QueryResponse response;
try {
response = OpentsdbOkHttpClient.query(queryRequestExt);
//没有数据时
if (response.getResults().size() == 0) {
response = enterWhenItIsARind(characterParamInfos);
}
return response;
} catch (IOException | URISyntaxException e) {
log.error("根据kksCode获取当前start时候下的实时数据失败!");
response = enterWhenItIsARind(characterParamInfos);
return response;
}
}
@Override
public QueryResponse getRealTimeDataByKksCode(Map<String,String> kksCodes, String start, String end, String downSample) {
ResponseEnum.COLLECTION_NOT_ILLEGAL.assertCollectionNotILLEGAL(kksCodes.keySet());
//去数据库匹配测点
List<CharacterParamInfo> characterParamInfos = characterParamRepository.findAll((Specification<CharacterParamInfo>) (root, query, criteriaBuilder) -> {
Path<String> kkscodePath = root.get("kksCode");
CriteriaBuilder.In<String> in = criteriaBuilder.in(kkscodePath);
for (String kkscode : kksCodes.keySet()) {
in.value(kkscode);
}
return in;
});
QueryRequestExt queryRequestExt = QueryRequestExt.builder()
.start(start)
.end(end)
.build();
//获取最后一条数据 然后根据这个查询
for (CharacterParamInfo characterParamInfo : characterParamInfos) {
QueryExt build = QueryExt.builder()
.aggregator(Aggregator.NONE)
.metric(characterParamInfo.getKksCode())
.build();
if (StrUtil.isNotBlank(downSample)) {
......
......@@ -92,6 +92,9 @@ public class PlantInfoServiceImpl extends BaseServiceImpl<String, PlantInfo> imp
try {
//获取最后一个时间点
Long lastTime = OpentsdbOkHttpClient.queryLast(characterParamInfoVos);
if(lastTime==0){
lastTime = System.currentTimeMillis();
}
//判断并获取最后一个时间点 7天前,每5min一个点
QueryRequestExt queryRequestExt = QueryRequestExt.builder()
.start((lastTime - 7 * 24 * 3600 * 1000) + "")
......
package cn.wise.sc.energy.power.plant.business.task;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.wise.sc.energy.power.plant.business.bean.DataPower;
import cn.wise.sc.energy.power.plant.business.domain.vo.CharacterParamInfoVo;
import cn.wise.sc.energy.power.plant.business.opentsdb.OpentsdbOkHttpClient;
......@@ -50,17 +52,25 @@ public class IndexRealTimeTask extends ScheduleTask {
Map<String, List<Map.Entry<String, String>>> groupMap = deviceMap.entrySet().stream().collect(Collectors.groupingBy(c -> c.getValue()));
//需要区分的测点
String[] points = new String[]{
"发电机定子A相电流",
"发电机定子B相电流",
"发电机定子C相电流",
"发电机定子AB线电压",
"发电机定子BC线电压",
"发电机定子CA线电压",
//右上
"定子C相电流",
"定子CA线电压",
"定子负序电流",
//右中
"汽端座振X",
"汽端轴振X",
"励端座振X",
"励端轴振X",
//右下
"排油温度",
//左下极坐标
"层间温度",
"上层线圈出水温度",
"下层线圈出水温度",
//中间
"转速1",
"有功功率",
};
//向每个webSocket推送系统实时数据
......@@ -97,12 +107,9 @@ public class IndexRealTimeTask extends ScheduleTask {
});
powerPoints.setValue(finalList);
if (IS_CONTAINS(KKsCodes.get(queryResult.getMetric()), new String[]{
"发电机定子A相电流",
"发电机定子B相电流",
"发电机定子C相电流",
"发电机定子AB线电压",
"发电机定子BC线电压",
"发电机定子CA线电压",
"定子C相电流",
"定子CA线电压",
"定子负序电流",
})) {
//右上图
DataPower dataPower = map.getOrDefault("rightTop", new DataPower());
......@@ -131,12 +138,67 @@ public class IndexRealTimeTask extends ScheduleTask {
dataPower.getDataList().add(powerPoints);
map.putIfAbsent("rightBottom", dataPower);
}
if (!"total".equals(deviceId)){
if (IS_CONTAINS(KKsCodes.get(queryResult.getMetric()), new String[]{
//左下极坐标
"层间温度",
})) {
//左下图
DataPower dataPower = map.getOrDefault("leftBottom1", new DataPower());
dataPower.setXAxis(xAxis);
dataPower.getDataList().add(powerPoints);
map.putIfAbsent("leftBottom1", dataPower);
}
if (IS_CONTAINS(KKsCodes.get(queryResult.getMetric()), new String[]{
//左下极坐标
"下层线圈出水温度",
})) {
//左下图
DataPower dataPower = map.getOrDefault("leftBottom2", new DataPower());
dataPower.setXAxis(xAxis);
dataPower.getDataList().add(powerPoints);
map.putIfAbsent("leftBottom2", dataPower);
}
if (IS_CONTAINS(KKsCodes.get(queryResult.getMetric()), new String[]{
//左下极坐标
"上层线圈出水温度",
})) {
//左下图
DataPower dataPower = map.getOrDefault("leftBottom3", new DataPower());
dataPower.setXAxis(xAxis);
dataPower.getDataList().add(powerPoints);
map.putIfAbsent("leftBottom3", dataPower);
}
if (IS_CONTAINS(KKsCodes.get(queryResult.getMetric()), new String[]{
"有功功率",
})) {
DataPower dataPower = map.getOrDefault("有功", new DataPower());
dataPower.setXAxis(xAxis);
dataPower.getDataList().add(powerPoints);
map.putIfAbsent("有功", dataPower);
}
if (IS_CONTAINS(KKsCodes.get(queryResult.getMetric()), new String[]{
"转速1",
})) {
DataPower dataPower = map.getOrDefault("转速", new DataPower());
dataPower.setXAxis(xAxis);
dataPower.getDataList().add(powerPoints);
map.putIfAbsent("转速", dataPower);
}
}
}
try {
List<Map.Entry<String, String>> deviceGroupMap = groupMap.get(deviceId);
//根据deviceid统一发送
for (Map.Entry<String, String> entry : deviceGroupMap) {
final IndexRealTimeWebSocket webSocket = webSocketMap.get(entry.getKey());
final SendWebSocket webSocket = webSocketMap.get(entry.getKey());
webSocket.sendMessage(JSON.toJSONString(map));
}
} catch (IOException e) {
......
......@@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentHashMap;
@Component
@ServerEndpoint("/index/realTimeData/{deviceId}")
@DependsOn("myApplicationContextAware")
public class IndexRealTimeWebSocket {
public class IndexRealTimeWebSocket implements SendWebSocket{
/**
* 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
......@@ -113,6 +113,7 @@ public class IndexRealTimeWebSocket {
/**
* 实现服务器主动推送
*/
@Override
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
......
......@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
@Component
@DependsOn("myApplicationContextAware")
@ServerEndpoint("/index/{plantCode}/{deviceId}")
@Deprecated
public class IndexSystemWebSocket {
/**
......
......@@ -54,7 +54,7 @@ public class OilSystem1Task extends ScheduleTask {
Map<String, List<Map.Entry<String, String>>> groupMap = deviceMap.entrySet().stream().collect(Collectors.groupingBy(c -> c.getValue()));
//需要区分的测点
String[] points = new String[]{
//
//
"汽轮机备用油差压阀进口压力",
"排油烟机1号进口压力",
"排油烟机2号进口压力",
......@@ -88,10 +88,10 @@ public class OilSystem1Task extends ScheduleTask {
powerPoints.setValue(finalList);
if (IS_CONTAINS(KKsCodes.get(queryResult.getMetric()), points)) {
//左上图
DataPower dataPower = map.getOrDefault("leftTop", new DataPower());
DataPower dataPower = map.getOrDefault("rightTop", new DataPower());
dataPower.setXAxis(xAxis);
dataPower.getDataList().add(powerPoints);
map.putIfAbsent("leftTop", dataPower);
map.putIfAbsent("rightTop", dataPower);
}
}
try {
......
......@@ -24,6 +24,7 @@ import java.util.stream.Collectors;
* @create: 2020-05-20 16:00
*/
@Component
@Deprecated
public class ScheduledModel {
final
......
package cn.wise.sc.energy.power.plant.business.task;
import java.io.IOException;
/**
* @author neo.shu
* @since 2020/9/24 20:47
*/
public interface SendWebSocket {
public void sendMessage(String message) throws IOException;
}
......@@ -65,12 +65,9 @@ public class TaskCacheDataService {
public List<CharacterParamInfoVo> cacheIndexCPI() {
List<String> characterNames = new ArrayList<>();
//右上
characterNames.add("发电机定子A相电流");
characterNames.add("发电机定子B相电流");
characterNames.add("发电机定子C相电流");
characterNames.add("发电机定子AB线电压");
characterNames.add("发电机定子BC线电压");
characterNames.add("发电机定子CA线电压");
characterNames.add("定子C相电流");
characterNames.add("定子CA线电压");
characterNames.add("定子负序电流");
//右中
characterNames.add("汽端座振X");
characterNames.add("汽端轴振X");
......@@ -78,6 +75,13 @@ public class TaskCacheDataService {
characterNames.add("励端轴振X");
//右下
characterNames.add("排油温度");
//左下
characterNames.add("层间温度");
characterNames.add("上层线圈出水温度");
characterNames.add("下层线圈出水温度");
//中间
characterNames.add("转速1");
characterNames.add("有功功率");
List<CharacterParamInfoVo> characterParamInfoVos =
iCharacterParamService.getCharacterByName(characterNames, "", "");
return characterParamInfoVos;
......
package cn.wise.sc.energy.power.plant.business.task;
import cn.wise.sc.energy.power.plant.business.bean.DataPower;
import cn.wise.sc.energy.power.plant.business.domain.vo.CharacterParamInfoVo;
import cn.wise.sc.energy.power.plant.business.opentsdb.OpentsdbOkHttpClient;
import cn.wise.sc.energy.power.plant.business.service.ICharacterParamService;
import cn.wise.sc.energy.power.plant.business.service.IEventInfoService;
import cn.wise.sc.energy.power.plant.business.task.schedule.ScheduleTask;
import com.alibaba.fastjson.JSON;
import net.opentsdb.client.api.query.response.QueryResponse;
import net.opentsdb.client.bean.QueryResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* @author neo.shu
* @since 2020/9/17 17:30
*/
@Service(value = "tendencyTask")
public class TendencyTask extends ScheduleTask {
@Autowired
ICharacterParamService iCharacterParamService;
@Autowired
IEventInfoService iEventInfoService;
@Autowired
TaskCacheDataService taskCacheDataService;
public TendencyTask() {
super(UUID.randomUUID().toString());
}
@Override
public void run() {
ConcurrentHashMap<String, TendencyWebSocket> webSocketMap =
TendencyWebSocket.webSocketMap;
ConcurrentHashMap<String, String> deviceMap = HydrogenSystemWebSocket.deviceMap;
if (webSocketMap.size() == 0 || deviceMap.size() == 0) {
return;
}
//给每个deviceid分组,然后统一查询,进行推送
Map<String, List<Map.Entry<String, String>>> groupMap = deviceMap.entrySet().stream().collect(Collectors.groupingBy(c -> c.getValue()));
//向每个webSocket推送系统实时数据
for (String deviceId : groupMap.keySet()) {
List<CharacterParamInfoVo> characterParamInfoVos = taskCacheDataService.cacheHydrogeCha(deviceId);
Map<String, String> KKsCodes = characterParamInfoVos.stream().filter(item -> IS_CONTAINS(item.getCpName(), new String[]{"汽端发电机冷氢温度"})).collect(Collectors.toMap(CharacterParamInfoVo::getKksCode, CharacterParamInfoVo::getCpName));
Long lastTime = OpentsdbOkHttpClient.queryLast(characterParamInfoVos);
QueryResponse response = iCharacterParamService
.getRealTimeDataByKksCode(new ArrayList<>(KKsCodes.keySet()), (lastTime - 1) + "", lastTime + "", "");
//组装数据结构
List<Object> xAxis = response.getResults().get(0).getDps().keySet().stream().map(item -> item.toString()).collect(Collectors.toList());
Map<String, DataPower> map = new HashMap<>();
for (QueryResult queryResult : response.getResults()) {
DataPower.PowerPoints powerPoints = new DataPower.PowerPoints();
powerPoints.setKksCode(queryResult.getMetric());
powerPoints.setName(KKsCodes.get(queryResult.getMetric()));
List<String> finalList = new ArrayList<>();
queryResult.getDps().values().stream().forEach(item -> {
finalList.add(item.toString());
});
powerPoints.setValue(finalList);
if (KKsCodes.get(queryResult.getMetric()).contains("汽端发电机冷氢温度")) {
DataPower dataPower = map.getOrDefault("rightBottom", new DataPower());
dataPower.setXAxis(xAxis);
dataPower.getDataList().add(powerPoints);
map.putIfAbsent("rightBottom", dataPower);
}
//左上图
}
try {
List<Map.Entry<String, String>> deviceGroupMap = groupMap.get(deviceId);
//根据deviceid统一发送
for (Map.Entry<String, String> entry : deviceGroupMap) {
final SendWebSocket webSocket = webSocketMap.get(entry.getKey());
webSocket.sendMessage(JSON.toJSONString(map));
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
package cn.wise.sc.energy.power.plant.business.task;
import cn.wise.sc.energy.power.plant.business.config.MyApplicationContextAware;
import cn.wise.sc.energy.power.plant.business.task.schedule.ScheduleUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
/**
* @description: 趋势实时数据的
* @author: neo.shu
* @create: 2020-08-17 14:07
**/
@Slf4j
@Component
@ServerEndpoint("/tendency/{deviceId}")
@DependsOn("myApplicationContextAware")
public class TendencyWebSocket implements SendWebSocket{
/**
* 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
*/
private static int onlineCount = 0;
/**
* concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
*/
public static ConcurrentHashMap<String, TendencyWebSocket> webSocketMap = new ConcurrentHashMap<>();
public static ConcurrentHashMap<String, String> deviceMap = new ConcurrentHashMap<>();
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
private String uuid;
private HydrogeSystem1Task hydrogeSystem1Task = (HydrogeSystem1Task) MyApplicationContextAware.getApplicationContext().getBean("hydrogeSystem1Task");
private HydrogeSystem3Task hydrogeSystem3Task = (HydrogeSystem3Task) MyApplicationContextAware.getApplicationContext().getBean("hydrogeSystem3Task");
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("deviceId") String deviceId) {
this.session = session;
uuid = UUID.randomUUID().toString();
deviceMap.put(uuid, deviceId);
webSocketMap.put(uuid, this);
//加入set中
addOnlineCount();
//启动任务
ScheduleUtil.start(hydrogeSystem1Task,1000);
ScheduleUtil.start(hydrogeSystem3Task,1000*3600);
//已有用户的情况,主动推送一次
if (webSocketMap.size() > 1) {
hydrogeSystem1Task.run();
hydrogeSystem3Task.run();
}
log.info("用户连接:" + deviceId + ",当前在线人数为:" + getOnlineCount());
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
if (webSocketMap.containsKey(uuid)) {
webSocketMap.remove(uuid);
deviceMap.remove(uuid);
//从set中删除
subOnlineCount();
}
if (webSocketMap.size() == 0) {
ScheduleUtil.cancel(hydrogeSystem1Task);
ScheduleUtil.cancel(hydrogeSystem3Task);
}
log.info("用户退出:" + uuid + ",当前在线人数为:" + getOnlineCount());
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) throws IOException {
log.info("用户消息:" + uuid + ",报文:" + message);
//可以群发消息
//消息保存到数据库、redis
if (StringUtils.isNotBlank(message)) {
webSocketMap.get(uuid).sendMessage("111111111");
}
}
/**
* 错误
* @param session 会话
* @param error 错误异常
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("用户错误:" + this.uuid + ",原因:" + error.getMessage());
error.printStackTrace();
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
/**
* 发送自定义消息
* @param message 消息
* @param userId 用户
* @throws IOException 异常
*/
public static void sendInfo(String message, @PathParam("userId") String userId) throws IOException {
log.info("发送消息到:" + userId + ",报文:" + message);
if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) {
webSocketMap.get(userId).sendMessage(message);
} else {
log.error("用户" + userId + ",不在线!");
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
TendencyWebSocket.onlineCount++;
}
public static synchronized void subOnlineCount() {
TendencyWebSocket.onlineCount--;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment