diff --git a/ms-biz/src/main/java/com/ms/api/service/impl/OpOrderServiceImpl.java b/ms-biz/src/main/java/com/ms/api/service/impl/OpOrderServiceImpl.java index 975e16f1..63121f17 100644 --- a/ms-biz/src/main/java/com/ms/api/service/impl/OpOrderServiceImpl.java +++ b/ms-biz/src/main/java/com/ms/api/service/impl/OpOrderServiceImpl.java @@ -99,6 +99,7 @@ import com.ms.dal.entity.OpOrderPhase; import com.ms.dal.entity.OpOrderPriceProtection; import com.ms.dal.entity.OpOrderRsyncInfo; import com.ms.dal.entity.OpOrderRsyncLog; +import com.ms.dal.entity.OpOrderRsyncPageBuffer; import com.ms.dal.entity.OpOrderRsyncPageLog; import com.ms.dal.entity.OpOrderRsyncPageQueue; import com.ms.dal.entity.OpOrderRsyncQueue; @@ -1987,11 +1988,79 @@ public class OpOrderServiceImpl implements OpOrderService { @Override public void moveOpPageBufferToQueue(Map shopIdAndMoveCntMap) { log.info("start moveOpPageBufferToQueue"); - int startBufferId = 0; + Long startBufferId = 0L; int limit = 1000; - for (Map.Entry entry : shopIdAndMoveCntMap.entrySet()) { + while (shopIdAndMoveCntMap.size() > 0) { + List chunkBufferList = opOrderRsyncPageBufferMapper.selectByBufferIdAndLimit(startBufferId, limit); + if (ObjectUtil.isEmpty(chunkBufferList)) { + break; + } + startBufferId = chunkBufferList.stream().map(OpOrderRsyncPageBuffer::getOpOrderRsyncPageBufferId).max(Long::compare).orElse(0L); + startMoveOpPageBufferToQueue(chunkBufferList, shopIdAndMoveCntMap); + shopIdAndMoveCntMap = shopIdAndMoveCntMap.entrySet().stream().filter(r -> r.getValue() > 0) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + log.info("end moveOpPageBufferToQueue"); + } + + private ApiResult startMoveOpPageBufferToQueue(List bufferList, Map shopIdAndMoveCntMap) { + List deleteBufferIds = new ArrayList<>(); + List insertQueueList = new ArrayList<>(); + List deleteShopIds = new ArrayList<>(); + for (OpOrderRsyncPageBuffer buffer : bufferList) { + Long shopId = buffer.getShopId(); + if (!shopIdAndMoveCntMap.containsKey(shopId) || shopIdAndMoveCntMap.get(shopId) < 0) { + continue; + } + Long cnt = shopIdAndMoveCntMap.get(shopId) - 1; + if (cnt < 0) { + deleteShopIds.add(shopId); + } + OpOrderRsyncPageQueue queue = new OpOrderRsyncPageQueue(); + queue.setOpOrderRsyncQueueId(buffer.getOpOrderRsyncQueueId()); + queue.setShopId(shopId); + queue.setGmtStartModified(buffer.getGmtStartModified()); + queue.setGmtEndModified(buffer.getGmtEndModified()); + queue.setPageSize(buffer.getPageSize()); + queue.setPage(buffer.getPage()); + queue.setPageCount(buffer.getPageCount()); + queue.setPreviewTotal(buffer.getPreviewTotal()); + queue.setLocked(0L); + queue.setGmtCreate(new Date()); + queue.setGmtModified(new Date()); + insertQueueList.add(queue); + deleteBufferIds.add(buffer.getOpOrderRsyncPageBufferId()); + } + for (Long id : deleteShopIds) { + shopIdAndMoveCntMap.remove(id); + } + Long startQueueId = null; + TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); + try { + if (ObjectUtil.isNotEmpty(insertQueueList)) { + opOrderRsyncPageQueueMapper.insertBatch(insertQueueList); + startQueueId = insertQueueList.get(insertQueueList.size() - 1).getOpOrderRsyncPageQueueId(); + } + if (ObjectUtil.isNotEmpty(deleteBufferIds)) { + opOrderRsyncPageBufferMapper.deleteByIdList(deleteBufferIds); + } + dataSourceTransactionManager.commit(transactionStatus); //手动提交 + } catch (Exception e) { + dataSourceTransactionManager.rollback(transactionStatus); //事务回滚 + return ApiResult.fail("数据处理失败,进行回滚。"); } + if (ObjectUtil.isNotNull(startQueueId)) { + int queueCount = insertQueueList.size(); + int queueIdLoop = 0; + for (Long queueId = startQueueId; queueId < (queueCount + startQueueId); queueId++) { + OpOrderRsyncPageQueue queueData = insertQueueList.get(queueIdLoop); + String queueRedisKey = RedisKeyConst.getQueueRedisKey(TblConst.op_order_rsync_page_queue, false); + redisTemplate.opsForList().leftPush(queueRedisKey, queueId.toString()); + queueIdLoop++; + } + } + return ApiResult.ok(); } private void doCheckRsyncOpEnd(Long shopId, Long queueId) { diff --git a/ms-biz/src/main/java/com/ms/api/spi/timer/MoveOpPageBufferTimerService.java b/ms-biz/src/main/java/com/ms/api/spi/timer/MoveOpPageBufferTimerService.java new file mode 100644 index 00000000..5fe71dd5 --- /dev/null +++ b/ms-biz/src/main/java/com/ms/api/spi/timer/MoveOpPageBufferTimerService.java @@ -0,0 +1,28 @@ +package com.ms.api.spi.timer; + +import com.jinritemai.cloud.base.api.BaseRequest; +import com.jinritemai.cloud.base.api.BaseResponse; +import com.jinritemai.cloud.base.api.ExtensionService; +import com.jinritemai.cloud.base.api.ExtensionServiceHandler; +import com.ms.api.common.R; +import com.ms.api.common.Ret; +import com.ms.api.common.TimerBaseService; +import com.ms.api.dto.ItemDTO; +import com.ms.api.task.MoveOpPageBufferTaskService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; + +@ExtensionService("moveOpPageBufferTimer") +@Slf4j +public class MoveOpPageBufferTimerService extends TimerBaseService implements ExtensionServiceHandler { + + @Autowired + MoveOpPageBufferTaskService moveOpPageBufferTaskService; + + @Override + public BaseResponse handle(BaseRequest req) { + moveOpPageBufferTaskService.runTask(); + return R.ok(Ret.success()); + } + +} diff --git a/ms-dal/src/main/java/com/ms/dal/mapper/OpOrderRsyncPageBufferMapper.java b/ms-dal/src/main/java/com/ms/dal/mapper/OpOrderRsyncPageBufferMapper.java index 8ba180b4..fec09b85 100644 --- a/ms-dal/src/main/java/com/ms/dal/mapper/OpOrderRsyncPageBufferMapper.java +++ b/ms-dal/src/main/java/com/ms/dal/mapper/OpOrderRsyncPageBufferMapper.java @@ -1,5 +1,7 @@ package com.ms.dal.mapper; +import java.util.List; + import com.ms.dal.entity.OpOrderRsyncPageBuffer; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; @@ -23,4 +25,8 @@ public interface OpOrderRsyncPageBufferMapper { int updateByPrimaryKey(OpOrderRsyncPageBuffer record); int selectCountByShopIdAndQueueId(@Param("shopId")Long shopId, @Param("queueId")Long queueId); + + List selectByBufferIdAndLimit(@Param("startBufferId")Long startBufferId, @Param("limit")int limit); + + void deleteByIdList(@Param("deleteBufferIds")List deleteBufferIds); } diff --git a/ms-dal/src/main/java/com/ms/dal/mapper/OpOrderRsyncPageQueueMapper.java b/ms-dal/src/main/java/com/ms/dal/mapper/OpOrderRsyncPageQueueMapper.java index 6e59f4e5..b2d18cba 100644 --- a/ms-dal/src/main/java/com/ms/dal/mapper/OpOrderRsyncPageQueueMapper.java +++ b/ms-dal/src/main/java/com/ms/dal/mapper/OpOrderRsyncPageQueueMapper.java @@ -24,7 +24,7 @@ public interface OpOrderRsyncPageQueueMapper { int updateByPrimaryKey(OpOrderRsyncPageQueue record); - void insertBatch(@Param("list") List list); + Long insertBatch(@Param("list") List list); OpOrderRsyncPageQueue queryFirstRowByQueueId(@Param("queueId") String queueId); diff --git a/ms-dal/src/main/resources/mapper/OpOrderRsyncPageBufferMapper.xml b/ms-dal/src/main/resources/mapper/OpOrderRsyncPageBufferMapper.xml index 12273ff3..52679060 100644 --- a/ms-dal/src/main/resources/mapper/OpOrderRsyncPageBufferMapper.xml +++ b/ms-dal/src/main/resources/mapper/OpOrderRsyncPageBufferMapper.xml @@ -29,15 +29,28 @@ select from op_order_rsync_page_buffer - where op_order_rsync_page_buffer_id = #{opOrderRsyncPageBufferId,jdbcType=BIGINT} + where op_order_rsync_page_buffer_id = #{opOrderRsyncPageBufferId,jdbcType=BIGINT} + delete from op_order_rsync_page_buffer - where op_order_rsync_page_buffer_id = #{opOrderRsyncPageBufferId,jdbcType=BIGINT} + where op_order_rsync_page_buffer_id = #{opOrderRsyncPageBufferId,jdbcType=BIGINT} + + + delete from op_order_rsync_page_buffer + where op_order_rsync_page_buffer_id IN + + #{item} + insert into op_order_rsync_page_buffer @@ -113,11 +126,11 @@ gmt_modified = #{gmtModified,jdbcType=TIMESTAMP}, - where op_order_rsync_page_buffer_id = #{opOrderRsyncPageBufferId,jdbcType=BIGINT} + where op_order_rsync_page_buffer_id = #{opOrderRsyncPageBufferId,jdbcType=BIGINT} update op_order_rsync_page_buffer - set + set op_order_rsync_queue_id = #{opOrderRsyncQueueId,jdbcType=BIGINT}, shop_id = #{shopId,jdbcType=BIGINT}, gmt_start_modified = #{gmtStartModified,jdbcType=TIMESTAMP}, @@ -128,6 +141,6 @@ preview_total = #{previewTotal,jdbcType=SMALLINT}, gmt_create = #{gmtCreate,jdbcType=TIMESTAMP}, gmt_modified = #{gmtModified,jdbcType=TIMESTAMP} - where op_order_rsync_page_buffer_id = #{opOrderRsyncPageBufferId,jdbcType=BIGINT} + where op_order_rsync_page_buffer_id = #{opOrderRsyncPageBufferId,jdbcType=BIGINT} diff --git a/ms-dal/src/main/resources/mapper/OpOrderRsyncPageQueueMapper.xml b/ms-dal/src/main/resources/mapper/OpOrderRsyncPageQueueMapper.xml index 490c7faf..c583ed7a 100644 --- a/ms-dal/src/main/resources/mapper/OpOrderRsyncPageQueueMapper.xml +++ b/ms-dal/src/main/resources/mapper/OpOrderRsyncPageQueueMapper.xml @@ -99,7 +99,7 @@ #{gmtModified,jdbcType=TIMESTAMP}, - + insert into op_order_rsync_page_queue ( op_order_rsync_page_queue_id,op_order_rsync_queue_id,shop_id ,gmt_start_modified,gmt_end_modified,page_size diff --git a/ms-web/src/main/java/com/ms/web/OrderTestController.java b/ms-web/src/main/java/com/ms/web/OrderTestController.java index c392663c..6459c9c8 100644 --- a/ms-web/src/main/java/com/ms/web/OrderTestController.java +++ b/ms-web/src/main/java/com/ms/web/OrderTestController.java @@ -25,6 +25,7 @@ import com.ms.api.dto.order.SearchDsOrderListRequestDTO; import com.ms.api.service.DistributionOrderService; import com.ms.api.service.OpOrderService; import com.ms.api.service.PurchaseOrderService; +import com.ms.api.tool.CommonTool; import com.ms.api.util.OrderUtil; import com.ms.api.util.PurchaseOrderUtil; import com.ms.dal.mapper.OpOrderExtMapper; @@ -33,6 +34,7 @@ import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** @@ -138,9 +140,9 @@ public class OrderTestController { return R.ok(Ret.success(result)); } - @PostMapping("/manualRsyncDdOrders") - public BaseResponse manualRsyncDdOrders(@RequestBody ManualRsyncDdOrdersDTO dto) { - ApiResult apiResult = orderUtil.rsyncOrder(AppConst.TEST_SHOP_ID, dto.getOrderIds()); + @GetMapping("/manualRsyncDdOrders") + public BaseResponse manualRsyncDdOrders(@RequestParam String orderIds) { + ApiResult apiResult = orderUtil.rsyncOrder(AppConst.TEST_SHOP_ID, CommonTool.splitWithComma(orderIds)); if (!apiResult.isSuccess()) { return R.fail(apiResult.getMsg()); }