2024-06-13 14:31:30 +08:00
|
|
|
package com.qihang.oms.mq;
|
|
|
|
|
|
2024-06-13 15:36:39 +08:00
|
|
|
import com.alibaba.fastjson2.JSON;
|
|
|
|
|
import com.qihang.common.enums.EnumShopType;
|
|
|
|
|
import com.qihang.common.mq.MqMessage;
|
2024-06-13 14:31:30 +08:00
|
|
|
import com.qihang.common.mq.MqType;
|
2024-06-13 15:36:39 +08:00
|
|
|
import com.qihang.oms.service.ErpSaleOrderService;
|
|
|
|
|
import lombok.AllArgsConstructor;
|
2024-06-13 14:31:30 +08:00
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
import org.springframework.data.redis.connection.Message;
|
|
|
|
|
import org.springframework.kafka.annotation.KafkaListener;
|
|
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Kafka 消息消费者
|
|
|
|
|
*/
|
2024-06-13 15:36:39 +08:00
|
|
|
@AllArgsConstructor
|
2024-06-13 14:31:30 +08:00
|
|
|
@Component
|
|
|
|
|
public class KafkaMQConsumer {
|
|
|
|
|
private static final Logger logger = LoggerFactory.getLogger(KafkaMQConsumer.class);
|
2024-06-13 15:36:39 +08:00
|
|
|
private final ErpSaleOrderService orderService;
|
2024-06-13 14:31:30 +08:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 订单消息处理
|
|
|
|
|
* @param message
|
|
|
|
|
*/
|
|
|
|
|
@KafkaListener(topics = {MqType.ORDER_MQ})
|
|
|
|
|
public void onOrderMessage(ConsumerRecord<String,Object> message) {
|
|
|
|
|
logger.info("收到kafka消息ORDER============"+message.topic()+"====="+message.partition()+"======"+message.value());
|
2024-06-13 15:36:39 +08:00
|
|
|
|
|
|
|
|
MqMessage vo = JSON.parseObject(message.value().toString(), MqMessage.class);
|
|
|
|
|
if(vo.getShopType().getIndex() == EnumShopType.JD.getIndex()) {
|
|
|
|
|
logger.info("Kafka订单消息JD"+vo.getKeyId());
|
|
|
|
|
orderService.jdOrderMessage(vo.getKeyId());
|
|
|
|
|
}else if(vo.getShopType().getIndex() == EnumShopType.TAO.getIndex()) {
|
|
|
|
|
logger.info("Kafka订单消息TAO"+vo.getKeyId());
|
|
|
|
|
orderService.taoOrderMessage(vo.getKeyId());
|
2024-06-13 18:18:44 +08:00
|
|
|
}else if(vo.getShopType().getIndex() == EnumShopType.PDD.getIndex()) {
|
|
|
|
|
logger.info("Kafka订单消息PDD"+vo.getKeyId());
|
|
|
|
|
orderService.pddOrderMessage(vo.getKeyId());
|
|
|
|
|
} else if(vo.getShopType().getIndex() == EnumShopType.DOU.getIndex()) {
|
|
|
|
|
logger.info("Kafka订单消息DOU"+vo.getKeyId());
|
|
|
|
|
orderService.douOrderMessage(vo.getKeyId());
|
|
|
|
|
} else if(vo.getShopType().getIndex() == EnumShopType.WEI.getIndex()) {
|
|
|
|
|
logger.info("Kafka订单消息WEI"+vo.getKeyId());
|
|
|
|
|
// orderService.weiOrderMessage(vo.getKeyId());
|
2024-06-13 15:36:39 +08:00
|
|
|
}
|
2024-06-13 14:31:30 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 退款消息处理
|
|
|
|
|
* @param message
|
|
|
|
|
*/
|
|
|
|
|
@KafkaListener(topics = {MqType.REFUND_MQ})
|
|
|
|
|
public void onRefundMessage(ConsumerRecord<String,Object> message) {
|
|
|
|
|
logger.info("收到kafka消息REFUND============"+message.topic()+"====="+message.partition()+"======"+message.value());
|
|
|
|
|
}
|
|
|
|
|
}
|