test
parent
8587383596
commit
46cf209ddf
@ -0,0 +1,11 @@
|
||||
package com.ms.biz.quque;
|
||||
|
||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public class ConfigConsumer implements Consumer<DefaultMQPushConsumer> {
|
||||
@Override
|
||||
public void accept(DefaultMQPushConsumer defaultMQPushConsumer) {
|
||||
}
|
||||
}
|
@ -0,0 +1,13 @@
|
||||
package com.ms.biz.quque;
|
||||
|
||||
import java.lang.annotation.*;
|
||||
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Target(ElementType.TYPE)
|
||||
@Documented
|
||||
public @interface ConsumerListener {
|
||||
String consumerGroup() default "";
|
||||
String topic() default "";
|
||||
int consumeThreadMax() default -1;
|
||||
int consumeThreadMin() default -1;
|
||||
}
|
@ -0,0 +1,27 @@
|
||||
package com.ms.biz.quque;
|
||||
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
|
||||
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
|
||||
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
|
||||
import org.springframework.context.annotation.AnnotationBeanNameGenerator;
|
||||
import org.springframework.context.annotation.ClassPathBeanDefinitionScanner;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.core.type.filter.AnnotationTypeFilter;
|
||||
|
||||
@Configuration
|
||||
public class ConsumerRegistryProcessor implements BeanDefinitionRegistryPostProcessor {
|
||||
@Override
|
||||
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry beanDefinitionRegistry) throws BeansException {
|
||||
ClassPathBeanDefinitionScanner scanner = new ClassPathBeanDefinitionScanner(beanDefinitionRegistry);
|
||||
scanner.setBeanNameGenerator(new AnnotationBeanNameGenerator());
|
||||
// 定义需要扫描的注解 -- 自定义注解
|
||||
scanner.addIncludeFilter(new AnnotationTypeFilter(ConsumerListener.class));
|
||||
// 定义扫描的包
|
||||
scanner.scan("com.ms.biz.queue.consumer");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {
|
||||
}
|
||||
}
|
@ -0,0 +1,6 @@
|
||||
package com.ms.biz.quque;
|
||||
|
||||
public class Group {
|
||||
public static final String GROUP_DS_MESSAGE = "miaoshougoods_ds_message";
|
||||
}
|
||||
|
@ -0,0 +1,27 @@
|
||||
package com.ms.biz.quque;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
public class InstanceHolder {
|
||||
private static final Logger log = LoggerFactory.getLogger(InstanceHolder.class);
|
||||
private static final Map<String, Object> MAP = new ConcurrentHashMap<>();
|
||||
|
||||
private InstanceHolder() {
|
||||
}
|
||||
|
||||
public static Object getIns(String key) {
|
||||
return MAP.get(key);
|
||||
}
|
||||
|
||||
public static void setIns(String key, Object ins) {
|
||||
Object previous = MAP.put(key, ins);
|
||||
if (!Objects.isNull(previous)) {
|
||||
throw new IllegalStateException("Create instances repeatedly!");
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,119 @@
|
||||
package com.ms.biz.quque;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import com.ms.biz.tool.SystemTool;
|
||||
import org.apache.logging.log4j.util.Strings;
|
||||
import org.apache.rocketmq.acl.common.AclClientRPCHook;
|
||||
import org.apache.rocketmq.acl.common.SessionCredentials;
|
||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
||||
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
|
||||
import org.apache.rocketmq.client.exception.MQClientException;
|
||||
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||
import org.apache.rocketmq.remoting.RPCHook;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class MqFactory {
|
||||
private static final Logger log = LoggerFactory.getLogger(MqFactory.class);
|
||||
|
||||
private MqFactory() {
|
||||
}
|
||||
|
||||
public static DefaultMQProducer newMqProducer(String group) throws MQClientException {
|
||||
String key = "mq-producer-" + group;
|
||||
Object ins = InstanceHolder.getIns(key);
|
||||
if (!Objects.isNull(ins)) {
|
||||
return (DefaultMQProducer)ins;
|
||||
} else {
|
||||
DefaultMQProducer producer = new DefaultMQProducer(group, getAclRPCHook(), true, null);
|
||||
producer.setNamesrvAddr(getMqUrl());
|
||||
|
||||
try {
|
||||
producer.start();
|
||||
log.info("#newMqProducer start success! GID: {}", group);
|
||||
InstanceHolder.setIns(key, producer);
|
||||
return producer;
|
||||
} catch (MQClientException var5) {
|
||||
log.error("#newMqProducer start went wrong! GID: " + group, var5);
|
||||
throw var5;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static DefaultMQPushConsumer newMqConsumer(String group, String topic, MessageListenerConcurrently messageListenerConcurrently) {
|
||||
return newMqConsumer(group, topic, messageListenerConcurrently, null);
|
||||
}
|
||||
|
||||
public static DefaultMQPushConsumer newMqConsumer(String group, String topic, MessageListenerConcurrently messageListenerConcurrently, Consumer<DefaultMQPushConsumer> configConsumer) {
|
||||
String key = "mq-consumer-" + group + topic;
|
||||
Object ins = InstanceHolder.getIns(key);
|
||||
if (!Objects.isNull(ins)) {
|
||||
return (DefaultMQPushConsumer)ins;
|
||||
} else {
|
||||
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group, getAclRPCHook(), new AllocateMessageQueueAveragely(), true, (String)null);
|
||||
consumer.setNamesrvAddr(getMqUrl());
|
||||
if (!Objects.isNull(configConsumer)) {
|
||||
configConsumer.accept(consumer);
|
||||
}
|
||||
consumer.setInstanceName(group + "_" + topic);
|
||||
|
||||
String logInfo = String.format(" GID: %s , topic: %s", group, topic);
|
||||
|
||||
try {
|
||||
consumer.subscribe(topic, "*");
|
||||
} catch (MQClientException var10) {
|
||||
log.error("#newMqConsumer subscribe went wrong !" + logInfo, var10);
|
||||
throw new RuntimeException(var10);
|
||||
}
|
||||
|
||||
consumer.registerMessageListener(messageListenerConcurrently);
|
||||
|
||||
try {
|
||||
consumer.start();
|
||||
} catch (MQClientException var9) {
|
||||
log.error("#newMqConsumer start went wrong !" + logInfo, var9);
|
||||
throw new RuntimeException(var9);
|
||||
}
|
||||
|
||||
log.info("#newMqConsumer start success! {}", logInfo);
|
||||
InstanceHolder.setIns(key, consumer);
|
||||
return consumer;
|
||||
}
|
||||
}
|
||||
|
||||
public static String generateProdTopic(String topic) {
|
||||
return getMqInstanceId() + "%" + topic;
|
||||
}
|
||||
|
||||
private static String getMqInstanceId() {
|
||||
return SystemTool.isDevEnv() ? "local" : System.getenv("sys-rmq-id");
|
||||
}
|
||||
|
||||
private static String getMqUrl() {
|
||||
String url = "127.0.0.1";
|
||||
String port = "9876";
|
||||
if (!SystemTool.isDevEnv()) {
|
||||
url = System.getenv("sys-rmq-url");
|
||||
port = System.getenv("sys-rmq-port");
|
||||
}
|
||||
return !Strings.isEmpty(url) && !Strings.isEmpty(port) ? String.format("http://%s:%s", url, port) : null;
|
||||
}
|
||||
|
||||
private static RPCHook getAclRPCHook() {
|
||||
if (SystemTool.isDevEnv()) {
|
||||
return null;
|
||||
}
|
||||
return new AclClientRPCHook(new SessionCredentials(getAK(), getSK()));
|
||||
}
|
||||
|
||||
private static String getAK() {
|
||||
return SystemTool.isDevEnv() ? "" : System.getenv("sys-rmq-ak");
|
||||
}
|
||||
|
||||
private static String getSK() {
|
||||
return SystemTool.isDevEnv() ? "" : System.getenv("sys-rmq-sk");
|
||||
}
|
||||
}
|
@ -0,0 +1,59 @@
|
||||
package com.ms.biz.quque;
|
||||
|
||||
import com.jinritemai.cloud.base.core.util.LogUtils;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.context.event.ContextRefreshedEvent;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class MqListener implements ApplicationListener<ContextRefreshedEvent> {
|
||||
@Resource
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
|
||||
LogUtils.setLogId("MqListener_" + System.currentTimeMillis());
|
||||
Map<String, Object> beansWithAnnotationMap = this.applicationContext.getBeansWithAnnotation(ConsumerListener.class);
|
||||
log.info("beansWithAnnotationMap: " + beansWithAnnotationMap);
|
||||
for (Map.Entry<String, Object> entry: beansWithAnnotationMap.entrySet()) {
|
||||
MessageListenerConcurrently concurrently = (MessageListenerConcurrently)entry.getValue();
|
||||
ConsumerListener annotation = concurrently.getClass().getAnnotation(ConsumerListener.class);
|
||||
if (Objects.equals(annotation.consumerGroup(), "")) {
|
||||
log.error("Consumer " + entry.getKey() + "'s consumerGroup is empty, skip!");
|
||||
continue;
|
||||
}
|
||||
if (Objects.equals(annotation.topic(), "")) {
|
||||
log.error("Consumer " + entry.getKey() + "'s topic is empty, skip!");
|
||||
continue;
|
||||
}
|
||||
|
||||
String topic = Topic.rebuild(annotation.topic());
|
||||
String group = annotation.consumerGroup();
|
||||
|
||||
log.info("Start consumer: " + entry.getKey() + " group:" + group + " topic:" + topic);
|
||||
DefaultMQPushConsumer consumer = MqFactory.newMqConsumer(
|
||||
group,
|
||||
topic,
|
||||
(MessageListenerConcurrently)entry.getValue(),
|
||||
defaultMQPushConsumer -> {
|
||||
if (annotation.consumeThreadMax() >= 0) {
|
||||
defaultMQPushConsumer.setConsumeThreadMax(annotation.consumeThreadMax());
|
||||
}
|
||||
if (annotation.consumeThreadMin() >= 0) {
|
||||
defaultMQPushConsumer.setConsumeThreadMin(annotation.consumeThreadMin());
|
||||
}
|
||||
}
|
||||
);
|
||||
log.info("consumerInfo: " + consumer);;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,38 @@
|
||||
package com.ms.biz.quque;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.client.exception.MQClientException;
|
||||
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||
import org.apache.rocketmq.client.producer.SendResult;
|
||||
import org.apache.rocketmq.common.message.Message;
|
||||
|
||||
@Slf4j
|
||||
public class Producer {
|
||||
private final String group;
|
||||
|
||||
public Producer(String group) {
|
||||
this.group = group;
|
||||
}
|
||||
|
||||
private DefaultMQProducer getGroupProducer() {
|
||||
try {
|
||||
return MqFactory.newMqProducer(group);
|
||||
} catch (MQClientException e) {
|
||||
log.error("New producer error:", e);
|
||||
throw new RuntimeException("Can not find producer!");
|
||||
}
|
||||
}
|
||||
|
||||
public SendResult send(Message msg, long timeout) {
|
||||
log.info("Send message keys: " + msg.getKeys());
|
||||
log.info("Send message topic: " + msg.getTopic());
|
||||
log.info("Send message body: " + new String(msg.getBody()));
|
||||
try {
|
||||
return getGroupProducer().send(msg, timeout);
|
||||
} catch (Throwable e) {
|
||||
//消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
|
||||
log.error("Send message failed!", e);
|
||||
throw new RuntimeException("Send message failed");
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,14 @@
|
||||
package com.ms.biz.quque;
|
||||
|
||||
import com.ms.biz.tool.SystemTool;
|
||||
|
||||
public class Topic {
|
||||
public static final String TOPIC_DS_MESSAGE = "miaoshougoods_rsync_order";
|
||||
public static String rebuild(String topic) {
|
||||
if (SystemTool.isDevEnv()) {
|
||||
return "local%" + topic;
|
||||
} else {
|
||||
return MqFactory.generateProdTopic(topic);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,6 @@
|
||||
package com.ms.biz.quque.consumer;
|
||||
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
||||
|
||||
public abstract class BaseConsumer implements MessageListenerConcurrently {
|
||||
}
|
@ -0,0 +1,42 @@
|
||||
package com.ms.biz.quque.consumer;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.jinritemai.cloud.base.core.util.LogUtils;
|
||||
import com.ms.biz.quque.ConsumerListener;
|
||||
import com.ms.biz.quque.Group;
|
||||
import com.ms.biz.quque.Topic;
|
||||
import com.ms.biz.quque.message.DsMessage;
|
||||
import com.ms.biz.service.DsMessageService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
|
||||
@Slf4j
|
||||
@ConsumerListener(consumerGroup = Group.GROUP_DS_MESSAGE, topic = Topic.TOPIC_DS_MESSAGE)
|
||||
@Component
|
||||
public class DsMessageConsumer extends BaseConsumer {
|
||||
@Autowired
|
||||
private DsMessageService dsMessageService;
|
||||
|
||||
@Override
|
||||
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
|
||||
LogUtils.setLogId(LogUtils.generateLogId());
|
||||
log.info("DsMessageConsumer Receive New Messages: {}", JSON.toJSONString(list));
|
||||
for (MessageExt message: list) {
|
||||
try {
|
||||
DsMessage dsMessage = JSON.parseObject(new String(message.getBody()), DsMessage.class);
|
||||
dsMessageService.processDsMessage(dsMessage.getDsMessageId());
|
||||
} catch (Throwable e) {
|
||||
log.error("DsMessageConsumerError", e);
|
||||
}
|
||||
}
|
||||
log.info("DsMessageConsumer consumeMessage success");
|
||||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||||
}
|
||||
}
|
@ -0,0 +1,56 @@
|
||||
package com.ms.biz.quque.message;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.annotation.JSONField;
|
||||
import com.ms.biz.quque.Topic;
|
||||
import lombok.Data;
|
||||
import org.apache.rocketmq.common.message.Message;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
@Data
|
||||
public class BaseMessage {
|
||||
@JSONField(serialize = false)
|
||||
protected String keys = "";
|
||||
|
||||
@JSONField(serialize = false)
|
||||
protected String topic = "";
|
||||
|
||||
@JSONField(serialize = false)
|
||||
protected String tags = "";
|
||||
|
||||
BaseMessage() {
|
||||
this.keys = generateKey();
|
||||
}
|
||||
|
||||
BaseMessage(String keys) {
|
||||
this.keys = keys;
|
||||
}
|
||||
|
||||
protected byte[] getBody() {
|
||||
return JSON.toJSONString(this).getBytes();
|
||||
}
|
||||
|
||||
public Message build() {
|
||||
Message message = new Message();
|
||||
message.setBody(this.getBody());
|
||||
message.setKeys(keys);
|
||||
return message;
|
||||
}
|
||||
|
||||
public Message build(String topic) {
|
||||
return new Message(Topic.rebuild(topic), "", keys, this.getBody());
|
||||
}
|
||||
|
||||
public Message build(String topic, String tags, int flag, boolean waitStoreMsgOK) {
|
||||
return new Message(Topic.rebuild(topic), tags, keys, flag, this.getBody(), waitStoreMsgOK);
|
||||
}
|
||||
|
||||
public Message build(String topic, String tags) {
|
||||
return new Message(Topic.rebuild(topic), tags, keys, 0, this.getBody(), true);
|
||||
}
|
||||
|
||||
protected String generateKey() {
|
||||
return UUID.randomUUID().toString();
|
||||
}
|
||||
}
|
@ -0,0 +1,10 @@
|
||||
package com.ms.biz.quque.message;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
@Data
|
||||
public class DsMessage extends BaseMessage {
|
||||
private Long dsMessageId;
|
||||
}
|
Loading…
Reference in New Issue