diff --git a/ms-biz/src/main/java/com/ms/biz/quque/ConfigConsumer.java b/ms-biz/src/main/java/com/ms/biz/quque/ConfigConsumer.java new file mode 100644 index 0000000..c14fb0f --- /dev/null +++ b/ms-biz/src/main/java/com/ms/biz/quque/ConfigConsumer.java @@ -0,0 +1,11 @@ +package com.ms.biz.quque; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; + +import java.util.function.Consumer; + +public class ConfigConsumer implements Consumer { + @Override + public void accept(DefaultMQPushConsumer defaultMQPushConsumer) { + } +} diff --git a/ms-biz/src/main/java/com/ms/biz/quque/ConsumerListener.java b/ms-biz/src/main/java/com/ms/biz/quque/ConsumerListener.java new file mode 100644 index 0000000..a6d6b1c --- /dev/null +++ b/ms-biz/src/main/java/com/ms/biz/quque/ConsumerListener.java @@ -0,0 +1,13 @@ +package com.ms.biz.quque; + +import java.lang.annotation.*; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +@Documented +public @interface ConsumerListener { + String consumerGroup() default ""; + String topic() default ""; + int consumeThreadMax() default -1; + int consumeThreadMin() default -1; +} diff --git a/ms-biz/src/main/java/com/ms/biz/quque/ConsumerRegistryProcessor.java b/ms-biz/src/main/java/com/ms/biz/quque/ConsumerRegistryProcessor.java new file mode 100644 index 0000000..4028ee2 --- /dev/null +++ b/ms-biz/src/main/java/com/ms/biz/quque/ConsumerRegistryProcessor.java @@ -0,0 +1,27 @@ +package com.ms.biz.quque; + +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; +import org.springframework.beans.factory.support.BeanDefinitionRegistry; +import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor; +import org.springframework.context.annotation.AnnotationBeanNameGenerator; +import org.springframework.context.annotation.ClassPathBeanDefinitionScanner; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.type.filter.AnnotationTypeFilter; + +@Configuration +public class ConsumerRegistryProcessor implements BeanDefinitionRegistryPostProcessor { + @Override + public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry beanDefinitionRegistry) throws BeansException { + ClassPathBeanDefinitionScanner scanner = new ClassPathBeanDefinitionScanner(beanDefinitionRegistry); + scanner.setBeanNameGenerator(new AnnotationBeanNameGenerator()); + // 定义需要扫描的注解 -- 自定义注解 + scanner.addIncludeFilter(new AnnotationTypeFilter(ConsumerListener.class)); + // 定义扫描的包 + scanner.scan("com.ms.biz.queue.consumer"); + } + + @Override + public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException { + } +} \ No newline at end of file diff --git a/ms-biz/src/main/java/com/ms/biz/quque/Group.java b/ms-biz/src/main/java/com/ms/biz/quque/Group.java new file mode 100644 index 0000000..88f3abf --- /dev/null +++ b/ms-biz/src/main/java/com/ms/biz/quque/Group.java @@ -0,0 +1,6 @@ +package com.ms.biz.quque; + +public class Group { + public static final String GROUP_DS_MESSAGE = "miaoshougoods_ds_message"; +} + diff --git a/ms-biz/src/main/java/com/ms/biz/quque/InstanceHolder.java b/ms-biz/src/main/java/com/ms/biz/quque/InstanceHolder.java new file mode 100644 index 0000000..9f7f61d --- /dev/null +++ b/ms-biz/src/main/java/com/ms/biz/quque/InstanceHolder.java @@ -0,0 +1,27 @@ +package com.ms.biz.quque; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +public class InstanceHolder { + private static final Logger log = LoggerFactory.getLogger(InstanceHolder.class); + private static final Map MAP = new ConcurrentHashMap<>(); + + private InstanceHolder() { + } + + public static Object getIns(String key) { + return MAP.get(key); + } + + public static void setIns(String key, Object ins) { + Object previous = MAP.put(key, ins); + if (!Objects.isNull(previous)) { + throw new IllegalStateException("Create instances repeatedly!"); + } + } +} diff --git a/ms-biz/src/main/java/com/ms/biz/quque/MqFactory.java b/ms-biz/src/main/java/com/ms/biz/quque/MqFactory.java new file mode 100644 index 0000000..fb98d05 --- /dev/null +++ b/ms-biz/src/main/java/com/ms/biz/quque/MqFactory.java @@ -0,0 +1,119 @@ +package com.ms.biz.quque; + +import java.util.Objects; +import java.util.function.Consumer; + +import com.ms.biz.tool.SystemTool; +import org.apache.logging.log4j.util.Strings; +import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.remoting.RPCHook; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MqFactory { + private static final Logger log = LoggerFactory.getLogger(MqFactory.class); + + private MqFactory() { + } + + public static DefaultMQProducer newMqProducer(String group) throws MQClientException { + String key = "mq-producer-" + group; + Object ins = InstanceHolder.getIns(key); + if (!Objects.isNull(ins)) { + return (DefaultMQProducer)ins; + } else { + DefaultMQProducer producer = new DefaultMQProducer(group, getAclRPCHook(), true, null); + producer.setNamesrvAddr(getMqUrl()); + + try { + producer.start(); + log.info("#newMqProducer start success! GID: {}", group); + InstanceHolder.setIns(key, producer); + return producer; + } catch (MQClientException var5) { + log.error("#newMqProducer start went wrong! GID: " + group, var5); + throw var5; + } + } + } + + public static DefaultMQPushConsumer newMqConsumer(String group, String topic, MessageListenerConcurrently messageListenerConcurrently) { + return newMqConsumer(group, topic, messageListenerConcurrently, null); + } + + public static DefaultMQPushConsumer newMqConsumer(String group, String topic, MessageListenerConcurrently messageListenerConcurrently, Consumer configConsumer) { + String key = "mq-consumer-" + group + topic; + Object ins = InstanceHolder.getIns(key); + if (!Objects.isNull(ins)) { + return (DefaultMQPushConsumer)ins; + } else { + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group, getAclRPCHook(), new AllocateMessageQueueAveragely(), true, (String)null); + consumer.setNamesrvAddr(getMqUrl()); + if (!Objects.isNull(configConsumer)) { + configConsumer.accept(consumer); + } + consumer.setInstanceName(group + "_" + topic); + + String logInfo = String.format(" GID: %s , topic: %s", group, topic); + + try { + consumer.subscribe(topic, "*"); + } catch (MQClientException var10) { + log.error("#newMqConsumer subscribe went wrong !" + logInfo, var10); + throw new RuntimeException(var10); + } + + consumer.registerMessageListener(messageListenerConcurrently); + + try { + consumer.start(); + } catch (MQClientException var9) { + log.error("#newMqConsumer start went wrong !" + logInfo, var9); + throw new RuntimeException(var9); + } + + log.info("#newMqConsumer start success! {}", logInfo); + InstanceHolder.setIns(key, consumer); + return consumer; + } + } + + public static String generateProdTopic(String topic) { + return getMqInstanceId() + "%" + topic; + } + + private static String getMqInstanceId() { + return SystemTool.isDevEnv() ? "local" : System.getenv("sys-rmq-id"); + } + + private static String getMqUrl() { + String url = "127.0.0.1"; + String port = "9876"; + if (!SystemTool.isDevEnv()) { + url = System.getenv("sys-rmq-url"); + port = System.getenv("sys-rmq-port"); + } + return !Strings.isEmpty(url) && !Strings.isEmpty(port) ? String.format("http://%s:%s", url, port) : null; + } + + private static RPCHook getAclRPCHook() { + if (SystemTool.isDevEnv()) { + return null; + } + return new AclClientRPCHook(new SessionCredentials(getAK(), getSK())); + } + + private static String getAK() { + return SystemTool.isDevEnv() ? "" : System.getenv("sys-rmq-ak"); + } + + private static String getSK() { + return SystemTool.isDevEnv() ? "" : System.getenv("sys-rmq-sk"); + } +} diff --git a/ms-biz/src/main/java/com/ms/biz/quque/MqListener.java b/ms-biz/src/main/java/com/ms/biz/quque/MqListener.java new file mode 100644 index 0000000..8537fe4 --- /dev/null +++ b/ms-biz/src/main/java/com/ms/biz/quque/MqListener.java @@ -0,0 +1,59 @@ +package com.ms.biz.quque; + +import com.jinritemai.cloud.base.core.util.LogUtils; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationListener; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.Map; +import java.util.Objects; + +@Slf4j +@Component +public class MqListener implements ApplicationListener { + @Resource + private ApplicationContext applicationContext; + + @Override + public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { + LogUtils.setLogId("MqListener_" + System.currentTimeMillis()); + Map beansWithAnnotationMap = this.applicationContext.getBeansWithAnnotation(ConsumerListener.class); + log.info("beansWithAnnotationMap: " + beansWithAnnotationMap); + for (Map.Entry entry: beansWithAnnotationMap.entrySet()) { + MessageListenerConcurrently concurrently = (MessageListenerConcurrently)entry.getValue(); + ConsumerListener annotation = concurrently.getClass().getAnnotation(ConsumerListener.class); + if (Objects.equals(annotation.consumerGroup(), "")) { + log.error("Consumer " + entry.getKey() + "'s consumerGroup is empty, skip!"); + continue; + } + if (Objects.equals(annotation.topic(), "")) { + log.error("Consumer " + entry.getKey() + "'s topic is empty, skip!"); + continue; + } + + String topic = Topic.rebuild(annotation.topic()); + String group = annotation.consumerGroup(); + + log.info("Start consumer: " + entry.getKey() + " group:" + group + " topic:" + topic); + DefaultMQPushConsumer consumer = MqFactory.newMqConsumer( + group, + topic, + (MessageListenerConcurrently)entry.getValue(), + defaultMQPushConsumer -> { + if (annotation.consumeThreadMax() >= 0) { + defaultMQPushConsumer.setConsumeThreadMax(annotation.consumeThreadMax()); + } + if (annotation.consumeThreadMin() >= 0) { + defaultMQPushConsumer.setConsumeThreadMin(annotation.consumeThreadMin()); + } + } + ); + log.info("consumerInfo: " + consumer);; + } + } +} diff --git a/ms-biz/src/main/java/com/ms/biz/quque/Producer.java b/ms-biz/src/main/java/com/ms/biz/quque/Producer.java new file mode 100644 index 0000000..7b17398 --- /dev/null +++ b/ms-biz/src/main/java/com/ms/biz/quque/Producer.java @@ -0,0 +1,38 @@ +package com.ms.biz.quque; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; + +@Slf4j +public class Producer { + private final String group; + + public Producer(String group) { + this.group = group; + } + + private DefaultMQProducer getGroupProducer() { + try { + return MqFactory.newMqProducer(group); + } catch (MQClientException e) { + log.error("New producer error:", e); + throw new RuntimeException("Can not find producer!"); + } + } + + public SendResult send(Message msg, long timeout) { + log.info("Send message keys: " + msg.getKeys()); + log.info("Send message topic: " + msg.getTopic()); + log.info("Send message body: " + new String(msg.getBody())); + try { + return getGroupProducer().send(msg, timeout); + } catch (Throwable e) { + //消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。 + log.error("Send message failed!", e); + throw new RuntimeException("Send message failed"); + } + } +} diff --git a/ms-biz/src/main/java/com/ms/biz/quque/Topic.java b/ms-biz/src/main/java/com/ms/biz/quque/Topic.java new file mode 100644 index 0000000..0417cdb --- /dev/null +++ b/ms-biz/src/main/java/com/ms/biz/quque/Topic.java @@ -0,0 +1,14 @@ +package com.ms.biz.quque; + +import com.ms.biz.tool.SystemTool; + +public class Topic { + public static final String TOPIC_DS_MESSAGE = "miaoshougoods_rsync_order"; + public static String rebuild(String topic) { + if (SystemTool.isDevEnv()) { + return "local%" + topic; + } else { + return MqFactory.generateProdTopic(topic); + } + } +} diff --git a/ms-biz/src/main/java/com/ms/biz/quque/consumer/BaseConsumer.java b/ms-biz/src/main/java/com/ms/biz/quque/consumer/BaseConsumer.java new file mode 100644 index 0000000..482f3cc --- /dev/null +++ b/ms-biz/src/main/java/com/ms/biz/quque/consumer/BaseConsumer.java @@ -0,0 +1,6 @@ +package com.ms.biz.quque.consumer; + +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; + +public abstract class BaseConsumer implements MessageListenerConcurrently { +} \ No newline at end of file diff --git a/ms-biz/src/main/java/com/ms/biz/quque/consumer/DsMessageConsumer.java b/ms-biz/src/main/java/com/ms/biz/quque/consumer/DsMessageConsumer.java new file mode 100644 index 0000000..d8f3eb6 --- /dev/null +++ b/ms-biz/src/main/java/com/ms/biz/quque/consumer/DsMessageConsumer.java @@ -0,0 +1,42 @@ +package com.ms.biz.quque.consumer; + +import com.alibaba.fastjson.JSON; +import com.jinritemai.cloud.base.core.util.LogUtils; +import com.ms.biz.quque.ConsumerListener; +import com.ms.biz.quque.Group; +import com.ms.biz.quque.Topic; +import com.ms.biz.quque.message.DsMessage; +import com.ms.biz.service.DsMessageService; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.common.message.MessageExt; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.List; + + +@Slf4j +@ConsumerListener(consumerGroup = Group.GROUP_DS_MESSAGE, topic = Topic.TOPIC_DS_MESSAGE) +@Component +public class DsMessageConsumer extends BaseConsumer { + @Autowired + private DsMessageService dsMessageService; + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { + LogUtils.setLogId(LogUtils.generateLogId()); + log.info("DsMessageConsumer Receive New Messages: {}", JSON.toJSONString(list)); + for (MessageExt message: list) { + try { + DsMessage dsMessage = JSON.parseObject(new String(message.getBody()), DsMessage.class); + dsMessageService.processDsMessage(dsMessage.getDsMessageId()); + } catch (Throwable e) { + log.error("DsMessageConsumerError", e); + } + } + log.info("DsMessageConsumer consumeMessage success"); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } +} diff --git a/ms-biz/src/main/java/com/ms/biz/quque/message/BaseMessage.java b/ms-biz/src/main/java/com/ms/biz/quque/message/BaseMessage.java new file mode 100644 index 0000000..4c531d3 --- /dev/null +++ b/ms-biz/src/main/java/com/ms/biz/quque/message/BaseMessage.java @@ -0,0 +1,56 @@ +package com.ms.biz.quque.message; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.annotation.JSONField; +import com.ms.biz.quque.Topic; +import lombok.Data; +import org.apache.rocketmq.common.message.Message; + +import java.util.UUID; + +@Data +public class BaseMessage { + @JSONField(serialize = false) + protected String keys = ""; + + @JSONField(serialize = false) + protected String topic = ""; + + @JSONField(serialize = false) + protected String tags = ""; + + BaseMessage() { + this.keys = generateKey(); + } + + BaseMessage(String keys) { + this.keys = keys; + } + + protected byte[] getBody() { + return JSON.toJSONString(this).getBytes(); + } + + public Message build() { + Message message = new Message(); + message.setBody(this.getBody()); + message.setKeys(keys); + return message; + } + + public Message build(String topic) { + return new Message(Topic.rebuild(topic), "", keys, this.getBody()); + } + + public Message build(String topic, String tags, int flag, boolean waitStoreMsgOK) { + return new Message(Topic.rebuild(topic), tags, keys, flag, this.getBody(), waitStoreMsgOK); + } + + public Message build(String topic, String tags) { + return new Message(Topic.rebuild(topic), tags, keys, 0, this.getBody(), true); + } + + protected String generateKey() { + return UUID.randomUUID().toString(); + } +} diff --git a/ms-biz/src/main/java/com/ms/biz/quque/message/DsMessage.java b/ms-biz/src/main/java/com/ms/biz/quque/message/DsMessage.java new file mode 100644 index 0000000..d8ff98e --- /dev/null +++ b/ms-biz/src/main/java/com/ms/biz/quque/message/DsMessage.java @@ -0,0 +1,10 @@ +package com.ms.biz.quque.message; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +@EqualsAndHashCode(callSuper = true) +@Data +public class DsMessage extends BaseMessage { + private Long dsMessageId; +} diff --git a/ms-biz/src/main/java/com/ms/biz/service/impl/DistributionOrderServiceImpl.java b/ms-biz/src/main/java/com/ms/biz/service/impl/DistributionOrderServiceImpl.java index 57bcfea..ea16922 100644 --- a/ms-biz/src/main/java/com/ms/biz/service/impl/DistributionOrderServiceImpl.java +++ b/ms-biz/src/main/java/com/ms/biz/service/impl/DistributionOrderServiceImpl.java @@ -76,13 +76,7 @@ public class DistributionOrderServiceImpl implements DistributionOrderService { } Set purOrderIds = new HashSet<>(orderIdAndPurOrderIdMap.values()); - List existsPlatformPurchaseOrders = platformPurchaseOrderMapper.getListByPurOrderIds(new ArrayList<>(purOrderIds)); - Set existsPurOrderIds = existsPlatformPurchaseOrders.stream().map(PlatformPurchaseOrder::getPurOrderId).collect(Collectors.toSet()); - if (!existsPurOrderIds.isEmpty()) { - throw new RuntimeException("已经存在采购单号:" + String.join(",", existsPurOrderIds)); - } - - storeCreateOrders(shopId, param.getOrders()); + storeCreateOrders(shopId, param.getOrders(), purOrderIds); BatchCreateDistributionOrdersRequestDTO request = buildBatchCreateDistributionOrdersRequestDTO(param); BatchCreateDistributionOrdersResponseDTO response = dsApiService.batchCreateDistributionOrders(param.getShopId(), request); @@ -90,6 +84,14 @@ public class DistributionOrderServiceImpl implements DistributionOrderService { // String res = "{\"result\":\"success\",\"successCount\":1,\"failCount\":0,\"successList\":[{\"result\":\"success\",\"data\":{\"success\":true,\"totalSuccessAmount\":1399,\"orderId\":\"3630260954504828754\",\"postFee\":0,\"mutilOrders\":null},\"createAlibabaOrderLogId\":1420,\"platformOrderIds\":[\"6917543139367389081\"],\"isEncryptOrder\":\"true\",\"isSupportEncryptOrder\":1,\"isUseManualConsignee\":0,\"flow\":\"fenxiao\",\"includeSplitJxhy\":false,\"sourceOrderIds\":[\"3630260954504828754\"],\"payRet\":null,\"platformOrderId\":\"6917543139367389081\",\"relatePurchaseOrderInfos\":[{\"platformOrderId\":\"6917543139367389081\",\"purchasePlatform\":\"1688DS\",\"purchaseOrderSn\":\"3630260954504828754\",\"purchaseOrderBuyer\":\"\\u6843\\u72f8\\u706c\\u5c0f\\u7231\",\"purchaseOrderSeller\":\"\\u4e49\\u4e4c\\u73cd\\u4eab\\u8d38\\u6613\",\"purchaseOrderPayment\":\"13.99\",\"purchaseOrderFullname\":\"\\u7535*\",\"purchaseOrderMobile\":\"1********00\",\"purchaseOrderFullAddress\":\"\\u5317\\u4eac \\u5317\\u4eac\\u5e02 \\u6d77\\u6dc0\\u533a \\u6d77\\u6dc0\\u8857\\u9053 \\u4e92\\u8054*******\",\"purchaseOrderStartTime\":\"2023-11-14 14:29:33\",\"platformPushStatus\":\"wait\",\"purchaseOrderLogisticsName\":\"\",\"purchaseOrderWaybillCode\":\"\",\"purchaseOrderStatus\":\"wait_pay\",\"purchaseOrderFlow\":\"fenxiao\",\"alibabaOrderStatus\":\"waitbuyerpay\",\"isUseManualConsignee\":0,\"isDsEncryptOrder\":0,\"isSupportEncryptOrder\":0,\"logisticsIsAccept\":false,\"items\":[{\"skuId\":\"3659175955997568\",\"wareId\":\"3649883085652281308\",\"purchaseNum\":\"1\",\"purchasePrice\":\"13.99\",\"sourceItemId\":\"686872273204\",\"subItemId\":\"3630260954504828754\"}],\"mergePurchasePlatformOrderIds\":null,\"purchaseOrderUpdateTime\":\"2023-11-14 14:29:33\",\"purchaseOrderPayTime\":null}]}],\"errorList\":null,\"lockSourceItems\":[{\"createPurchaseOrderLockSkuId\":\"2077\",\"createPurchaseOrderLockId\":\"20001608\",\"sourceItemId\":\"686872273204\",\"sourceSkuId\":\"4897420288982\",\"createPurchaseOrderLogId\":\"1477\",\"sourceOrderId\":\"3630260954504828754\",\"sourceNum\":null,\"reqId\":null,\"status\":\"lock\",\"gmtCreate\":\"2023-11-14 14:29:32\",\"gmtModified\":\"2023-11-14 14:29:33\",\"platformOrderId\":\"6917543139367389081\",\"platformItemId\":\"3649883085652281308\",\"platformSkuId\":\"3659175955997568\"}]}"; // BatchCreateDistributionOrdersResponseDTO response = JSON.parseObject(res, BatchCreateDistributionOrdersResponseDTO.class); + List errorList = new ArrayList<>(); + for (DsErrorRetDTO errorRet: response.getErrorList()) { + errorList.add(errorRet.getReason()); + } + if (!errorList.isEmpty()) { + throw new RuntimeException(String.join(";", errorList)); + } + List orderResults = new ArrayList<>(); List purchaseOrders = new ArrayList<>(); List purchaseOrderItems = new ArrayList<>(); @@ -243,10 +245,16 @@ public class DistributionOrderServiceImpl implements DistributionOrderService { return orderResult; } - private void storeCreateOrders(Long shopId, List orders) { + private void storeCreateOrders(Long shopId, List orders, Set purOrderIds) { + List existsPlatformPurchaseOrders = platformPurchaseOrderMapper.getListByPurOrderIds(new ArrayList<>(purOrderIds)); + Set existsPurOrderIds = existsPlatformPurchaseOrders.stream().map(PlatformPurchaseOrder::getPurOrderId).collect(Collectors.toSet()); + TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); try { for (CreateOrder order: orders) { + if (existsPurOrderIds.contains(order.getPurOrderId())) { + continue; + } savePlatformPurchaseOrder(shopId, order); savePlatformPurchaseOrderAddress(shopId, order.getPurOrderId(), order.getPostAddress()); savePlatformPurchaseOrderMaskAddress(shopId, order.getPurOrderId(), order.getMaskPostAddress()); diff --git a/ms-biz/src/main/java/com/ms/biz/service/impl/DsMessageServiceImpl.java b/ms-biz/src/main/java/com/ms/biz/service/impl/DsMessageServiceImpl.java index 829f159..84b5800 100644 --- a/ms-biz/src/main/java/com/ms/biz/service/impl/DsMessageServiceImpl.java +++ b/ms-biz/src/main/java/com/ms/biz/service/impl/DsMessageServiceImpl.java @@ -17,6 +17,9 @@ import com.ms.biz.consts.DsMessageConst; import com.ms.biz.consts.PurchaseOrderConst; import com.ms.biz.consts.Refund1688Const; import com.ms.biz.consts.StatusConst; +import com.ms.biz.quque.Group; +import com.ms.biz.quque.Producer; +import com.ms.biz.quque.Topic; import com.ms.biz.service.DsMessageConsumerBufferService; import com.ms.biz.service.DsMessageConsumerQueueService; import com.ms.biz.service.DsMessageService; @@ -139,16 +142,12 @@ public class DsMessageServiceImpl implements DsMessageService { @Override public void addDsMessage(Long shopId, Long platformPushMsgId, String bizId, String bizType, String data) { - TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); - try { - log.info("addDsMessagePlatformPushMsgId" + platformPushMsgId); - DsMessage dsMessage = add(shopId, platformPushMsgId, bizId, bizType, data); - dsMessageConsumerBufferService.addBuffer(shopId, dsMessage.getDsMessageId(), null); - dataSourceTransactionManager.commit(transactionStatus); - } catch (Throwable e) { - log.error("addDsMessageError", e); - dataSourceTransactionManager.rollback(transactionStatus); - } + log.info("addDsMessagePlatformPushMsgId" + platformPushMsgId); + DsMessage dsMessage = add(shopId, platformPushMsgId, bizId, bizType, data); + com.ms.biz.quque.message.DsMessage message = new com.ms.biz.quque.message.DsMessage(); + message.setDsMessageId(dsMessage.getDsMessageId()); + Producer producer = new Producer(Group.GROUP_DS_MESSAGE); + producer.send(message.build(Topic.TOPIC_DS_MESSAGE), 5000); } @Override @@ -172,8 +171,29 @@ public class DsMessageServiceImpl implements DsMessageService { dsMessageConsumerQueueService.deleteByPrimaryKey(queue.getDsMessageConsumerQueueId()); } + @Override public void processDsMessage(Long dsMessageId) { DsMessage dsMessage = selectByPrimaryKey(dsMessageId); + String reason = "成功"; + String status = StatusConst.success; + try { + handleDsMessage(dsMessage); + } catch (RuntimeException e) { + log.error("processDsMessageError", e); + reason = e.getMessage(); + status = StatusConst.fail; + } catch (Throwable e) { + log.error("processDsMessageError", e); + reason = "系统异常"; + status = StatusConst.fail; + } + dsMessage.setStatus(status); + dsMessage.setReason(reason); + dsMessage.setGmtModified(new Date()); + dsMessageMapper.updateByPrimaryKeySelective(dsMessage); + } + + private void handleDsMessage(DsMessage dsMessage) { if (dsMessage == null) { throw new RuntimeException("消息不存在"); } diff --git a/ms-biz/src/main/java/com/ms/biz/spi/callback/DsMessageCallbackService.java b/ms-biz/src/main/java/com/ms/biz/spi/callback/DsMessageCallbackService.java index bcbf00a..6c33291 100644 --- a/ms-biz/src/main/java/com/ms/biz/spi/callback/DsMessageCallbackService.java +++ b/ms-biz/src/main/java/com/ms/biz/spi/callback/DsMessageCallbackService.java @@ -40,6 +40,9 @@ public class DsMessageCallbackService extends SPIBaseService implements Extensio } catch (RuntimeException e) { log.error("dsMessageCallbackError", e); return R.ok(Ret.fail(e.getMessage())); + } catch (Throwable e) { + log.error("dsMessageCallbackError", e); + return R.ok(Ret.fail("系统异常")); } } }