Browse Source

kafka推送事件(AI)数据

develop
王兴琳 1 month ago
parent
commit
93b797d3f4
  1. 7
      ruoyi-admin/src/main/resources/application.yml
  2. 5
      zc-business/pom.xml
  3. 16
      zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java
  4. 55
      zc-business/src/main/java/com/zc/business/message/device/subscribe/KafkaTopicProducer.java

7
ruoyi-admin/src/main/resources/application.yml

@ -192,3 +192,10 @@ iot:
# 允许访问的ip地址 # 允许访问的ip地址
allowed: allowed:
ips: 10.0.111.11,10.168.73.54,10.168.71.194 ips: 10.0.111.11,10.168.73.54,10.168.71.194
# Kafka配置
kafka:
bootstrap-servers: 121.40.97.235:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
topic: eventAi

5
zc-business/pom.xml

@ -101,7 +101,10 @@
<groupId>com.ruoyi</groupId> <groupId>com.ruoyi</groupId>
<artifactId>ruoyi-quartz</artifactId> <artifactId>ruoyi-quartz</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.jsoup</groupId> <groupId>org.jsoup</groupId>
<artifactId>jsoup</artifactId> <artifactId>jsoup</artifactId>

16
zc-business/src/main/java/com/zc/business/message/device/handler/DeviceMessageHandler.java

@ -6,6 +6,7 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.gson.Gson;
import com.ruoyi.common.core.redis.RedisCache; import com.ruoyi.common.core.redis.RedisCache;
import com.zc.business.constant.RedisKeyConstants; import com.zc.business.constant.RedisKeyConstants;
import com.zc.business.domain.DcDevice; import com.zc.business.domain.DcDevice;
@ -13,6 +14,7 @@ import com.zc.business.domain.DcMeteorologicalDetectorData;
import com.zc.business.domain.DcWarning; import com.zc.business.domain.DcWarning;
import com.zc.business.domain.MdDeviceData; import com.zc.business.domain.MdDeviceData;
import com.zc.business.enums.*; import com.zc.business.enums.*;
import com.zc.business.message.device.subscribe.KafkaTopicProducer;
import com.zc.business.service.*; import com.zc.business.service.*;
import com.zc.common.core.websocket.WebSocketService; import com.zc.common.core.websocket.WebSocketService;
import com.zc.common.core.websocket.constant.WebSocketEvent; import com.zc.common.core.websocket.constant.WebSocketEvent;
@ -62,6 +64,8 @@ public class DeviceMessageHandler {
@Autowired @Autowired
private IDcMeteorologicalDetectorDataService meteorologicalDetectorDataService; private IDcMeteorologicalDetectorDataService meteorologicalDetectorDataService;
@Resource
private KafkaTopicProducer kafkaTopicProducer;
/** /**
@ -359,9 +363,20 @@ public class DeviceMessageHandler {
int WarningLevel = data.getInteger("visibilityLevel"); int WarningLevel = data.getInteger("visibilityLevel");
if (WarningLevel != UniversalEnum.ZERO.getNumber()) { if (WarningLevel != UniversalEnum.ZERO.getNumber()) {
dcWarningService.insertDcWarning(dcWarning); dcWarningService.insertDcWarning(dcWarning);
//kafka消息推送
// 使用Gson将对象转换为JSON字符串
Gson gson = new Gson();
String jsonString = gson.toJson(dcWarning);
kafkaTopicProducer.KafkaTopicProducer(jsonString);
} }
} else { } else {
dcWarningService.insertDcWarning(dcWarning); dcWarningService.insertDcWarning(dcWarning);
//kafka消息推送
// 使用Gson将对象转换为JSON字符串
Gson gson = new Gson();
String jsonString = gson.toJson(dcWarning);
kafkaTopicProducer.KafkaTopicProducer(jsonString);
} }
} }
@ -727,4 +742,5 @@ public class DeviceMessageHandler {
middleDatabaseService.insertMiddleDatabaseDeviceData(mdDeviceData); middleDatabaseService.insertMiddleDatabaseDeviceData(mdDeviceData);
} }
} }

55
zc-business/src/main/java/com/zc/business/message/device/subscribe/KafkaTopicProducer.java

@ -0,0 +1,55 @@
package com.zc.business.message.device.subscribe;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Value;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
*
*/
@Slf4j
public class KafkaTopicProducer {
@Value("${kafka.bootstrap-servers}")
private String servers;
@Value("${kafka.producer.key-serializer}")
private String keySerializer;
@Value("${kafka.producer.value-serializer}")
private String valueSerializer;
@Value("${kafka.producer.topic}")
private String topic;
public void KafkaTopicProducer(String event) {
// 配置生产者的属性
Properties props = new Properties();
props.put("bootstrap.servers", servers); // Kafka broker地址
props.put("key.serializer", keySerializer);
props.put("value.serializer",valueSerializer);
// 创建Kafka生产者实例
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
/* // 准备要发送的消息
String topic = "eventAi";
*//*String key = "key2";*//*
String value = "这是没有key的测试信息";*/
try {
// 发送消息,并获取元数据(异步)
// ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, event);
RecordMetadata metadata = producer.send(record).get();
// 数据信息
log.info(record.key(), record.value(), metadata.partition(), metadata.offset());
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
} finally {
// 关闭生产者
producer.close();
}
}
}
Loading…
Cancel
Save