Commit c2410dad by zzrdark

1.mqtt

2.日志上传模块
parent 475bb004
......@@ -28,10 +28,7 @@
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
......
......@@ -12,6 +12,41 @@
<artifactId>cneeds-server-logupload</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>com.mx.cneeds</groupId>
<artifactId>cneeds-common-utils</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.mx.cneeds</groupId>
<artifactId>cneeds-common-pojo</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.mx.cneeds</groupId>
<artifactId>cneeds-common-data</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
......
package com.mx.cneeds.server.logupload.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.mx.cneeds.common.pager.PageUtils;
import com.mx.cneeds.server.entity.DeviceLogEntity;
import org.springframework.transaction.annotation.Transactional;
import java.util.Map;
public interface DeviceLogService extends IService<DeviceLogEntity> {
PageUtils queryPage(Map<String, Object> params);
void deleteBatch(Long[] deviceLogIds);
void changeStatus(Long logId);
}
\ No newline at end of file
package com.mx.cneeds.server.logupload.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.mx.cneeds.common.pager.PageUtils;
import com.mx.cneeds.common.pager.Query;
import com.mx.cneeds.server.dao.DeviceLogDao;
import com.mx.cneeds.server.entity.DeviceLogEntity;
import com.mx.cneeds.server.logupload.service.DeviceLogService;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Arrays;
import java.util.Map;
/**
* @ClassName DeviceLogServiceImpl
* @Author zzrdark
* @Date 2020-03-31 14:14
* @Description TODO
**/
@Service
public class DeviceLogServiceImpl extends ServiceImpl<DeviceLogDao, DeviceLogEntity> implements DeviceLogService {
@Override
public PageUtils queryPage(Map<String, Object> params) {
IPage<DeviceLogEntity> page = this.page(
new Query<DeviceLogEntity>().getPage(params),
new QueryWrapper<DeviceLogEntity>()
);
return new PageUtils(page);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void deleteBatch(Long[] deviceLogIds) {
//删除
this.removeByIds(Arrays.asList(deviceLogIds));
}
@Override
public void changeStatus(Long logId){
DeviceLogEntity deviceLogEntity = new DeviceLogEntity();
deviceLogEntity.setStatus(1);
getBaseMapper().update(deviceLogEntity, new UpdateWrapper<DeviceLogEntity>().eq("logId", logId));
}
}
package com.mx.cneeds.server.logupload.web;
import com.mx.cneeds.common.pager.PageUtils;
import com.mx.cneeds.common.result.R;
import com.mx.cneeds.server.logupload.service.DeviceLogService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import java.io.File;
import java.util.List;
import java.util.Map;
/**
* @ClassName LogUploadController
* @Author zzrdark
* @Date 2020-03-27 15:26
* @Description TODO
**/
@Controller
@RequestMapping("/log/upload")
public class LogUploadController {
@Autowired
private DeviceLogService deviceLogService;
@RequestMapping("/addLogInfo")
public void wechatUploadLog(
String imei,
String logId,
List<File> files,
String bugTitle,
String steps,
Integer probability){
}
@RequestMapping("/addLogFile")
public void uploadLogFile(
String imei,
String logId,
File logFile ){
}
/**
* 列表
*/
@RequestMapping("/list")
public PageUtils list(@RequestParam Map<String, Object> params){
PageUtils page = deviceLogService.queryPage(params);
return page;
}
@RequestMapping("/delete")
public R delete(@RequestBody Long[] userIds){
deviceLogService.deleteBatch(userIds);
return R.ok();
}
/**
* 下载日志文件
*/
@RequestMapping("/downloadfile")
public void downFile(){
}
@RequestMapping("/finishLog")
public R finishLog(Long logId){
deviceLogService.changeStatus(logId);
return R.ok();
}
}
......@@ -45,6 +45,11 @@
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
</dependencies>
<build>
......
package com.mx.cneeds.server.user.mqtt.config;
import com.mx.cneeds.server.user.mqtt.event.MqttEvent;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
* @ClassName MqttConfiguration
* @author zzrdark
* @Date 2019-12-26 11:42
* @Description TODO
**/
@Configuration
@Slf4j
public class MqttConfiguration {
@Autowired
private MqttProperties mqttProperties;
@Bean
public MqttConnectOptions getMqttConnectOptions(){
MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
mqttConnectOptions.setUserName(mqttProperties.getUsername());
mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray());
mqttConnectOptions.setServerURIs(new String[]{mqttProperties.getHostUrl()});
mqttConnectOptions.setKeepAliveInterval(2);
mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAlive());
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
/**
* 接收通道
*/
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
//配置client,监听的topic
@Bean
public MessageProducer inbound(@Autowired MessageChannel mqttInputChannel) {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId()+"_inbound", mqttClientFactory(),
"hello","hello1");
adapter.setCompletionTimeout(Integer.valueOf(mqttProperties.getCompletionTimeout()));
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel);
return adapter;
}
/**
* 事件触发
*/
@Autowired
private ApplicationEventPublisher eventPublisher;
//通过通道获取数据
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
// String type = topic.substring(topic.lastIndexOf("/")+1, topic.length());
String qos = message.getHeaders().get("mqtt_receivedQos").toString();
/*if("hello".equalsIgnoreCase(topic)){
System.out.println("hello,fuckXX,"+message.getPayload().toString());
}else if("hello1".equalsIgnoreCase(topic)){
System.out.println("hello1,fuckXX,"+message.getPayload().toString());
}*/
eventPublisher.publishEvent(new MqttEvent(this,topic,message.getPayload().toString()));
log.info("topic:"+topic+" Qos:"+qos+" message:"+message.getPayload());
}
};
}
//发送数据
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientId(), mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(mqttProperties.getDefaultTopic());
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}
package com.mx.cneeds.server.user.mqtt.config;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* @author zzrdark
*/
@ConfigurationProperties(prefix = "spring.mqtt")
@Component
@Getter
@Setter
public class MqttProperties {
private String username;
private String password;
private String hostUrl;
private String clientId;
private String defaultTopic;
private String completionTimeout;
private Integer keepAlive;
}
\ No newline at end of file
package com.mx.cneeds.server.user.mqtt.event;
import lombok.Data;
import org.springframework.context.ApplicationEvent;
/**
* @ClassName MqttEvent
* @Author zzrdark
* @Date 2019-12-26 14:59
* @Description TODO
**/
@Data
public class MqttEvent extends ApplicationEvent {
/**
*
*/
private String topic;
/**
* 发送的消息
*/
private String message;
public MqttEvent(){
super(null);
}
public MqttEvent(Object source, String topic, String message) {
super(source);
this.topic = topic;
this.message = message;
}
}
package com.mx.cneeds.server.user.mqtt.listener;
import com.mx.cneeds.server.user.mqtt.event.MqttEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
/**
* @ClassName JobListener
* @Author zzrdark
* @Date 2019-12-26 15:06
* @Description 消费消息
**/
@Component
@Slf4j
public class JobListener {
/**
* 监听topic
* @param mqttEvent
*/
@EventListener(condition = "# mqttEvent.topic.equals(T(com.zzr.mqtt.qmemqtt.defalut.utils.TopicName).ROLL_CALL_2.getValue())")
public void onEmqttCall(MqttEvent mqttEvent){
log.info("接收到消息:"+mqttEvent.getMessage());
}
@EventListener(condition ="@ emqttPredicate.test(#mqttEvent)")
public void onEmqttCallTest(MqttEvent mqttEvent){
log.info("测试通过:"+mqttEvent.getMessage());
}
}
package com.mx.cneeds.server.user.mqtt.server;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
/**
* @ClassName MqttServer
* @Author zzrdark
* @Date 2019-12-26 15:10
* @Description TODO
**/
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttServer {
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data);
}
package com.mx.cneeds.server.user.mqtt.utils;
import com.mx.cneeds.server.user.mqtt.event.MqttEvent;
import org.springframework.stereotype.Component;
/**
* @ClassName EmqttPredicate
* @Author zzrdark
* @Date 2019-12-26 15:11
* @Description TODO
**/
@Component
public class EmqttPredicate {
public Boolean test(MqttEvent event){
//测试内容
return Boolean.FALSE;
}
}
package com.mx.cneeds.server.user.mqtt.utils;
/**
* @ClassName TopicName
* @Author zzrdark
* @Date 2019-12-26 14:48
* @Description TODO
**/
public enum TopicName {
ROLL_CALL_DEFAULT(1,"listenDefault"),
ROLL_CALL_2(2,"hello");
private final Integer key;
private final String value;
private TopicName(Integer key,String value){
this.key = key;
this.value = value;
}
public Integer getKey() {
return key;
}
public String getValue() {
return value;
}
@Override
public String toString() {
return this.value;
}
}
......@@ -42,6 +42,18 @@ spring:
# wall:
# config:
# multi-statement-allow: true
mqtt:
username: admin
password: public
host-url: tcp://120.25.162.101:1883
# client-id: zhjsbackground${random.value}
client-id: server
# default-topic: brokers/1
# default-topic: brokers/2
default-topic: brokers/#
# default-topic: $SYS/brokers/+/clients/#
completionTimeout: 3000
keepAlive: 60
eureka:
......
......@@ -57,6 +57,10 @@
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment