抖店消息处理

20230922-ljl-fixBug
wangchaoxu 1 year ago
parent 29281507e2
commit 47386f031c

@ -9,4 +9,6 @@ public interface QueueService {
Map<Long, Long> allocBufferToQueueByShopAvg(String bufferName, String queueName, JSONObject filter, int maxQueueCount, Integer shopLimit);
int unlockTimeoutSolidRedisQueue(String tableName, int heartbeatExpiredSeconds, String status);
int unlockTimeoutQueue(String tableName, int heartbeatExpiredSeconds, String status);
}

@ -227,6 +227,27 @@ public class QueueServiceImpl implements QueueService {
return extraWhere.toString();
}
public int unlockTimeoutQueue(String tableName, int heartbeatExpiredSeconds, String status) {
DateTime deadTime = DateUtil.offsetSecond(new Date(), -heartbeatExpiredSeconds);
String where = String.format(" WHERE locked > 0 AND gmt_last_heartbeat < '%s' ", DateUtil.formatDateTime(deadTime));
if (StrUtil.isBlank(status)) {
where = where.concat(String.format(" and `status` = %s ", status));
}
List<Map<String, Object>> queueList = queueMapper.select(tableName, where);
if (ObjectUtil.isEmpty(queueList)) {
return 0;
}
String queueIdColumnName = QueueConst.getQueueIdColumnName(tableName);
int ar1 = 0;
for (Map<String, Object> queue : queueList) {
Long queueId = (Long) queue.get(queueIdColumnName);
if (ObjectUtil.isNotNull(queueId)) {
ar1 = queueMapper.updateLocked(tableName, queueIdColumnName, queueId, 0, DateUtil.now());
}
}
return queueList.size();
}
// public void lockSolidRedisQueue(Integer timerLockId, String queueName, boolean isAdminSelf) {
// String queueRedisKey = RedisKeyConst.getQueueRedisKey(queueName, isAdminSelf);
// String queueId = redisTemplate.opsForList().rightPop(queueRedisKey);

@ -0,0 +1,27 @@
package com.ms.api.spi.timer;
import com.jinritemai.cloud.base.api.BaseRequest;
import com.jinritemai.cloud.base.api.BaseResponse;
import com.jinritemai.cloud.base.api.ExtensionService;
import com.jinritemai.cloud.base.api.ExtensionServiceHandler;
import com.ms.api.common.R;
import com.ms.api.common.Ret;
import com.ms.api.common.TimerBaseService;
import com.ms.api.dto.ItemDTO;
import com.ms.api.task.UnlockDoudianMsgTimeoutQueueTaskService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ExtensionService("unlockDoudianMsgTimeoutQueueTimer")
@Slf4j
public class UnlockDoudianMsgTimeoutQueueTimerService extends TimerBaseService implements ExtensionServiceHandler<ItemDTO, Ret> {
@Autowired
private UnlockDoudianMsgTimeoutQueueTaskService unlockDoudianMsgTimeoutQueueTaskService;
@Override
public BaseResponse<Ret> handle(BaseRequest<ItemDTO> req) {
unlockDoudianMsgTimeoutQueueTaskService.runTask();
return R.ok(Ret.success());
}
}

@ -0,0 +1,79 @@
package com.ms.api.task;
import com.ms.api.common.TaskBaseService;
import com.ms.api.consts.TblConst;
import com.ms.api.service.QueueService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.concurrent.Executor;
@Configuration
@Component
@Slf4j
public class UnlockDoudianMsgTimeoutQueueTaskService extends TaskBaseService {
@Autowired
private QueueService queueService;
/**
*
*/
public int getCorePoolSiz() {
return 3;
}
/**
*
*/
public String getTaskExecutorName() {
return "unlockDoudianMsgTimeoutQueueTaskPool";
}
@Bean(name = "unlockDoudianMsgTimeoutQueueTaskPool")
@Override
public Executor getAsyncExecutor() {
return super.getAsyncExecutor();
}
@Resource(name = "unlockDoudianMsgTimeoutQueueTaskPool")
protected Executor taskPool;
@Override
protected Executor getTaskPool() {
return taskPool;
}
@Async("unlockDoudianMsgTimeoutQueueTaskPool")
@Override
public void runTask() {
super.runTask();
}
@Override
public Object getTask() {
return true;
}
@Override
public Object processTask(Object params) {
if (params == null) {
return null;
}
queueService.unlockTimeoutSolidRedisQueue(TblConst.doudian_aftersale_msg_queue, 300, null);
queueService.unlockTimeoutSolidRedisQueue(TblConst.doudian_trade_msg_queue, 300, null);
queueService.unlockTimeoutQueue(TblConst.doudian_msg_parse_queue, 300, null);
return true;
}
@Override
public void clearTask(Object params) {
if (params == null) {
return;
}
}
}
Loading…
Cancel
Save