新增Redis消息队列实现

This commit is contained in:
启航 2024-03-07 16:28:39 +08:00
parent 63aca636ba
commit 994ffa029e
9 changed files with 174 additions and 9 deletions

View File

@ -55,6 +55,7 @@
<groupId>com.alibaba.cloud</groupId> <groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.qihang</groupId> <groupId>com.qihang</groupId>
<artifactId>security</artifactId> <artifactId>security</artifactId>

View File

@ -20,4 +20,11 @@ public class JdApi
System.out.println( "Hello jd-api!" ); System.out.println( "Hello jd-api!" );
SpringApplication.run(JdApi.class, args); SpringApplication.run(JdApi.class, args);
} }
/**
* redis消息监听器容器
*/
} }

View File

@ -1,12 +1,27 @@
package com.qihang.jd.controller; package com.qihang.jd.controller;
import com.qihang.jd.mq.MqUtils;
import com.qihang.jd.mq.MqMessage;
import lombok.AllArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
@AllArgsConstructor
@RestController @RestController
public class HomeController { public class HomeController {
private final MqUtils mqUtils;
// private final RedisTemplate redisTemplate;
//
// public void sendMessage(String channel, Object message) {
// redisTemplate.convertAndSend(channel, message);
//// redisTemplate.convertAndSend(channel, message);
// }
@GetMapping("/") @GetMapping("/")
public String home(){ public String home(){
MqMessage mqVo = MqMessage.build(1,"52332555");
mqUtils.sendMessage("channel",mqVo);
return "{'code':0,'msg':'请通过api访问'}"; return "{'code':0,'msg':'请通过api访问'}";
} }
} }

View File

@ -3,17 +3,13 @@ package com.qihang.jd.controller;
import com.jd.open.api.sdk.DefaultJdClient; import com.jd.open.api.sdk.DefaultJdClient;
import com.jd.open.api.sdk.JdClient; import com.jd.open.api.sdk.JdClient;
import com.jd.open.api.sdk.request.order.PopOrderEnSearchRequest; import com.jd.open.api.sdk.request.order.PopOrderEnSearchRequest;
import com.jd.open.api.sdk.request.refundapply.PopAfsRefundapplyQuerylistRequest;
import com.jd.open.api.sdk.request.ware.SkuReadSearchSkuListRequest;
import com.jd.open.api.sdk.request.ware.WareReadSearchWare4ValidRequest;
import com.jd.open.api.sdk.response.order.PopOrderEnSearchResponse; import com.jd.open.api.sdk.response.order.PopOrderEnSearchResponse;
import com.jd.open.api.sdk.response.refundapply.PopAfsRefundapplyQuerylistResponse;
import com.jd.open.api.sdk.response.ware.SkuReadSearchSkuListResponse;
import com.jd.open.api.sdk.response.ware.WareReadSearchWare4ValidResponse;
import com.qihang.common.common.ApiResult; import com.qihang.common.common.ApiResult;
import com.qihang.common.enums.HttpStatus; import com.qihang.common.enums.HttpStatus;
import com.qihang.jd.common.ApiCommon; import com.qihang.jd.common.ApiCommon;
import com.qihang.jd.common.PullRequest; import com.qihang.jd.common.PullRequest;
import com.qihang.jd.mq.MqMessage;
import com.qihang.jd.mq.MqUtils;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
@ -25,8 +21,14 @@ import org.springframework.web.bind.annotation.RestController;
@AllArgsConstructor @AllArgsConstructor
public class OrderApiController { public class OrderApiController {
private final ApiCommon apiCommon; private final ApiCommon apiCommon;
// private final RedisCache redisCache;
private final MqUtils mqUtils;
@RequestMapping(value = "/pull_list", method = RequestMethod.POST) @RequestMapping(value = "/pull_list", method = RequestMethod.POST)
public Object pullList(@RequestBody PullRequest params) throws Exception { public Object pullList(@RequestBody PullRequest params) throws Exception {
// Object cacheObject = redisCache.getCacheObject("jdorder");
if (params.getShopId() == null || params.getShopId() <= 0) { if (params.getShopId() == null || params.getShopId() <= 0) {
// return ApiResul new ApiResult(HttpStatus.PARAMS_ERROR, "参数错误没有店铺Id"); // return ApiResul new ApiResult(HttpStatus.PARAMS_ERROR, "参数错误没有店铺Id");
return ApiResult.build(HttpStatus.PARAMS_ERROR, "参数错误没有店铺Id"); return ApiResult.build(HttpStatus.PARAMS_ERROR, "参数错误没有店铺Id");
@ -61,7 +63,9 @@ public class OrderApiController {
PopOrderEnSearchRequest request =new PopOrderEnSearchRequest(); PopOrderEnSearchRequest request =new PopOrderEnSearchRequest();
request.setStartDate("2024-02-06 00:20:35"); request.setStartDate("2024-02-06 00:20:35");
request.setEndDate("2024-03-05 15:20:35"); request.setEndDate("2024-03-05 15:20:35");
request.setOrderState("WAIT_GOODS_RECEIVE_CONFIRM"); // request.setOrderState("WAIT_SELLER_STOCK_OUT,WAIT_GOODS_RECEIVE_CONFIRM,WAIT_SELLER_DELIVERY,PAUSE,FINISHED_L,TRADE_CANCELED,LOCKED,POP_ORDER_PAUSE");
request.setOrderState("");
// request.setOrderState("ALL");
// request.setOptionalFields("orderId,venderId"); // request.setOptionalFields("orderId,venderId");
// request.setSourceId("JOS"); // request.setSourceId("JOS");
request.setOptionalFields("venderId,orderId,orderType,payType,orderTotalPrice,orderSellerPrice,orderPayment,freightPrice,sellerDiscount,orderState" + request.setOptionalFields("venderId,orderId,orderType,payType,orderTotalPrice,orderSellerPrice,orderPayment,freightPrice,sellerDiscount,orderState" +
@ -73,7 +77,8 @@ public class OrderApiController {
request.setSortType(1); request.setSortType(1);
request.setDateType(0); request.setDateType(0);
PopOrderEnSearchResponse response=client.execute(request); PopOrderEnSearchResponse response=client.execute(request);
MqMessage mqVo = MqMessage.build(1,"52332555000");
mqUtils.sendApiMessage(mqVo);
return response; return response;
} }
} }

View File

@ -0,0 +1,38 @@
package com.qihang.jd.mq;
import com.alibaba.fastjson2.JSON;
import lombok.AllArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;
@AllArgsConstructor
@Component
public class ApiMessageReceiver implements MessageListener {
private static final Logger logger = LoggerFactory.getLogger(ApiMessageReceiver.class);
private final RedisTemplate redisTemplate;
@Override
public void onMessage(Message message, byte[] pattern) {
RedisSerializer<String> valueSerializer = redisTemplate.getStringSerializer();
String deserialize = valueSerializer.deserialize(message.getBody());
//message deserialize 都能拿到msg
logger.info("Received <" + message + ">");
System.out.println("deserialize = " + deserialize);
String messageContent = new String(message.getBody());
MqMessage vo = JSON.parseObject(messageContent, MqMessage.class);
System.out.println(vo.getMqType());
if(vo.getMqType() == 1){
// 有新订单插入新订单到shop_order
logger.info("有新订单插入新订单到shop_order");
}
}
}

View File

@ -0,0 +1,16 @@
package com.qihang.jd.mq;
import lombok.Data;
@Data
public class MqMessage {
private int mqType;// 消息类型1订单消息2退款消息
private String keyId;//主键ID
public static MqMessage build(int mqType , String keyId){
MqMessage result = new MqMessage();
result.setMqType(mqType);
result.setKeyId(keyId);
return result;
}
}

View File

@ -0,0 +1,18 @@
package com.qihang.jd.mq;
import lombok.AllArgsConstructor;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
@AllArgsConstructor
@Component
public class MqUtils {
private final RedisTemplate redisTemplate;
public void sendMessage(String channel, Object message) {
redisTemplate.convertAndSend(channel, message);
}
public void sendApiMessage(MqMessage message) {
redisTemplate.convertAndSend("ApiMessage", message);
}
}

View File

@ -0,0 +1,65 @@
package com.qihang.jd.mq;
import lombok.AllArgsConstructor;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
@AllArgsConstructor
@Configuration
@AutoConfigureAfter({ApiMessageReceiver.class})
public class SubscriberConfig {
private final RedisTemplate redisTemplate;
/**
* 消息监听适配器注入接受消息方法输入方法名字 反射方法
*
* @param apiMessageReceiver
* @return
*/
@Bean
public MessageListenerAdapter getMessageListenerAdapter(ApiMessageReceiver apiMessageReceiver) {
//当没有实现MessageListener时需要写接收消息的方法名字实现了就不用写receiveMessage了
// return new MessageListenerAdapter(apiMessageReceiver, "receiveMessage");
return new MessageListenerAdapter(apiMessageReceiver);
}
/**
* 创建消息监听容器
*
* @param redisConnectionFactory
* @param messageListenerAdapter
* @return
*/
@Bean
public RedisMessageListenerContainer getRedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter messageListenerAdapter) {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
redisMessageListenerContainer.addMessageListener(messageListenerAdapter, new PatternTopic("TOPIC_USERNAME"));
return redisMessageListenerContainer;
}
/**
*
* @param connectionFactory
* @return
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//订阅频道通配符*表示任意多个占位符
// API 消息订阅
container.addMessageListener(new ApiMessageReceiver(redisTemplate), new PatternTopic("ApiMessage*"));
return container;
}
}

View File

@ -19,7 +19,7 @@ public class OrderTask implements IPollableService {
@Override @Override
public String getCronExpression() { public String getCronExpression() {
SysTask task = taskService.getById(1); SysTask task = taskService.getById(2);
// return "0/1 * * * * ?"; // return "0/1 * * * * ?";
return task.getCron(); return task.getCron();
} }