moveOpPageBufferTimer

20230922-ljl-fixBug
daixiaogang 1 year ago
parent cc520ab5ee
commit 6f2aa02a57

@ -99,6 +99,7 @@ import com.ms.dal.entity.OpOrderPhase;
import com.ms.dal.entity.OpOrderPriceProtection;
import com.ms.dal.entity.OpOrderRsyncInfo;
import com.ms.dal.entity.OpOrderRsyncLog;
import com.ms.dal.entity.OpOrderRsyncPageBuffer;
import com.ms.dal.entity.OpOrderRsyncPageLog;
import com.ms.dal.entity.OpOrderRsyncPageQueue;
import com.ms.dal.entity.OpOrderRsyncQueue;
@ -1987,11 +1988,79 @@ public class OpOrderServiceImpl implements OpOrderService {
@Override
public void moveOpPageBufferToQueue(Map<Long, Long> shopIdAndMoveCntMap) {
log.info("start moveOpPageBufferToQueue");
int startBufferId = 0;
Long startBufferId = 0L;
int limit = 1000;
for (Map.Entry<Long, Long> entry : shopIdAndMoveCntMap.entrySet()) {
while (shopIdAndMoveCntMap.size() > 0) {
List<OpOrderRsyncPageBuffer> chunkBufferList = opOrderRsyncPageBufferMapper.selectByBufferIdAndLimit(startBufferId, limit);
if (ObjectUtil.isEmpty(chunkBufferList)) {
break;
}
startBufferId = chunkBufferList.stream().map(OpOrderRsyncPageBuffer::getOpOrderRsyncPageBufferId).max(Long::compare).orElse(0L);
startMoveOpPageBufferToQueue(chunkBufferList, shopIdAndMoveCntMap);
shopIdAndMoveCntMap = shopIdAndMoveCntMap.entrySet().stream().filter(r -> r.getValue() > 0)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
log.info("end moveOpPageBufferToQueue");
}
private ApiResult startMoveOpPageBufferToQueue(List<OpOrderRsyncPageBuffer> bufferList, Map<Long, Long> shopIdAndMoveCntMap) {
List<Long> deleteBufferIds = new ArrayList<>();
List<OpOrderRsyncPageQueue> insertQueueList = new ArrayList<>();
List<Long> deleteShopIds = new ArrayList<>();
for (OpOrderRsyncPageBuffer buffer : bufferList) {
Long shopId = buffer.getShopId();
if (!shopIdAndMoveCntMap.containsKey(shopId) || shopIdAndMoveCntMap.get(shopId) < 0) {
continue;
}
Long cnt = shopIdAndMoveCntMap.get(shopId) - 1;
if (cnt < 0) {
deleteShopIds.add(shopId);
}
OpOrderRsyncPageQueue queue = new OpOrderRsyncPageQueue();
queue.setOpOrderRsyncQueueId(buffer.getOpOrderRsyncQueueId());
queue.setShopId(shopId);
queue.setGmtStartModified(buffer.getGmtStartModified());
queue.setGmtEndModified(buffer.getGmtEndModified());
queue.setPageSize(buffer.getPageSize());
queue.setPage(buffer.getPage());
queue.setPageCount(buffer.getPageCount());
queue.setPreviewTotal(buffer.getPreviewTotal());
queue.setLocked(0L);
queue.setGmtCreate(new Date());
queue.setGmtModified(new Date());
insertQueueList.add(queue);
deleteBufferIds.add(buffer.getOpOrderRsyncPageBufferId());
}
for (Long id : deleteShopIds) {
shopIdAndMoveCntMap.remove(id);
}
Long startQueueId = null;
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
try {
if (ObjectUtil.isNotEmpty(insertQueueList)) {
opOrderRsyncPageQueueMapper.insertBatch(insertQueueList);
startQueueId = insertQueueList.get(insertQueueList.size() - 1).getOpOrderRsyncPageQueueId();
}
if (ObjectUtil.isNotEmpty(deleteBufferIds)) {
opOrderRsyncPageBufferMapper.deleteByIdList(deleteBufferIds);
}
dataSourceTransactionManager.commit(transactionStatus); //手动提交
} catch (Exception e) {
dataSourceTransactionManager.rollback(transactionStatus); //事务回滚
return ApiResult.fail("数据处理失败,进行回滚。");
}
if (ObjectUtil.isNotNull(startQueueId)) {
int queueCount = insertQueueList.size();
int queueIdLoop = 0;
for (Long queueId = startQueueId; queueId < (queueCount + startQueueId); queueId++) {
OpOrderRsyncPageQueue queueData = insertQueueList.get(queueIdLoop);
String queueRedisKey = RedisKeyConst.getQueueRedisKey(TblConst.op_order_rsync_page_queue, false);
redisTemplate.opsForList().leftPush(queueRedisKey, queueId.toString());
queueIdLoop++;
}
}
return ApiResult.ok();
}
private void doCheckRsyncOpEnd(Long shopId, Long queueId) {

@ -0,0 +1,28 @@
package com.ms.api.spi.timer;
import com.jinritemai.cloud.base.api.BaseRequest;
import com.jinritemai.cloud.base.api.BaseResponse;
import com.jinritemai.cloud.base.api.ExtensionService;
import com.jinritemai.cloud.base.api.ExtensionServiceHandler;
import com.ms.api.common.R;
import com.ms.api.common.Ret;
import com.ms.api.common.TimerBaseService;
import com.ms.api.dto.ItemDTO;
import com.ms.api.task.MoveOpPageBufferTaskService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ExtensionService("moveOpPageBufferTimer")
@Slf4j
public class MoveOpPageBufferTimerService extends TimerBaseService implements ExtensionServiceHandler<ItemDTO, Ret> {
@Autowired
MoveOpPageBufferTaskService moveOpPageBufferTaskService;
@Override
public BaseResponse<Ret> handle(BaseRequest<ItemDTO> req) {
moveOpPageBufferTaskService.runTask();
return R.ok(Ret.success());
}
}

@ -1,5 +1,7 @@
package com.ms.dal.mapper;
import java.util.List;
import com.ms.dal.entity.OpOrderRsyncPageBuffer;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
@ -23,4 +25,8 @@ public interface OpOrderRsyncPageBufferMapper {
int updateByPrimaryKey(OpOrderRsyncPageBuffer record);
int selectCountByShopIdAndQueueId(@Param("shopId")Long shopId, @Param("queueId")Long queueId);
List<OpOrderRsyncPageBuffer> selectByBufferIdAndLimit(@Param("startBufferId")Long startBufferId, @Param("limit")int limit);
void deleteByIdList(@Param("deleteBufferIds")List<Long> deleteBufferIds);
}

@ -24,7 +24,7 @@ public interface OpOrderRsyncPageQueueMapper {
int updateByPrimaryKey(OpOrderRsyncPageQueue record);
void insertBatch(@Param("list") List<OpOrderRsyncPageQueue> list);
Long insertBatch(@Param("list") List<OpOrderRsyncPageQueue> list);
OpOrderRsyncPageQueue queryFirstRowByQueueId(@Param("queueId") String queueId);

@ -34,11 +34,24 @@
<select id="selectCountByShopIdAndQueueId" resultType="java.lang.Integer">
select count(*) from op_order_rsync_page_buffer where shop_id = #{shopId} and op_order_rsync_queue_id = #{queueId}
</select>
<select id="selectByBufferIdAndLimit" resultType="com.ms.dal.entity.OpOrderRsyncPageBuffer">
select * from op_order_rsync_page_buffer
where op_order_rsync_page_buffer_id > #{startBufferId}
order by op_order_rsync_page_buffer_id asc
limit #{limit}
</select>
<delete id="deleteByPrimaryKey" parameterType="java.lang.Long">
delete from op_order_rsync_page_buffer
where op_order_rsync_page_buffer_id = #{opOrderRsyncPageBufferId,jdbcType=BIGINT}
</delete>
<delete id="deleteByIdList">
delete from op_order_rsync_page_buffer
where op_order_rsync_page_buffer_id IN
<foreach collection="deleteBufferIds" index="index" item="item" open="(" separator="," close=")">
#{item}
</foreach>
</delete>
<insert id="insert" keyColumn="op_order_rsync_page_buffer_id" keyProperty="opOrderRsyncPageBufferId" parameterType="com.ms.dal.entity.OpOrderRsyncPageBuffer" useGeneratedKeys="true">
insert into op_order_rsync_page_buffer
( op_order_rsync_page_buffer_id,op_order_rsync_queue_id,shop_id

@ -99,7 +99,7 @@
<if test="gmtModified != null">#{gmtModified,jdbcType=TIMESTAMP},</if>
</trim>
</insert>
<insert id="insertBatch" >
<insert id="insertBatch" useGeneratedKeys="true" keyColumn="op_order_rsync_page_queue_id" keyProperty="opOrderRsyncPageQueueId" >
insert into op_order_rsync_page_queue
( op_order_rsync_page_queue_id,op_order_rsync_queue_id,shop_id
,gmt_start_modified,gmt_end_modified,page_size

@ -25,6 +25,7 @@ import com.ms.api.dto.order.SearchDsOrderListRequestDTO;
import com.ms.api.service.DistributionOrderService;
import com.ms.api.service.OpOrderService;
import com.ms.api.service.PurchaseOrderService;
import com.ms.api.tool.CommonTool;
import com.ms.api.util.OrderUtil;
import com.ms.api.util.PurchaseOrderUtil;
import com.ms.dal.mapper.OpOrderExtMapper;
@ -33,6 +34,7 @@ import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
@ -138,9 +140,9 @@ public class OrderTestController {
return R.ok(Ret.success(result));
}
@PostMapping("/manualRsyncDdOrders")
public BaseResponse<Ret> manualRsyncDdOrders(@RequestBody ManualRsyncDdOrdersDTO dto) {
ApiResult apiResult = orderUtil.rsyncOrder(AppConst.TEST_SHOP_ID, dto.getOrderIds());
@GetMapping("/manualRsyncDdOrders")
public BaseResponse<Ret> manualRsyncDdOrders(@RequestParam String orderIds) {
ApiResult apiResult = orderUtil.rsyncOrder(AppConst.TEST_SHOP_ID, CommonTool.splitWithComma(orderIds));
if (!apiResult.isSuccess()) {
return R.fail(apiResult.getMsg());
}

Loading…
Cancel
Save