新增商品搬家的后台任务

20230922-ljl-fixBug
qiushengjie 1 year ago
parent f1eae3ddb9
commit e9001f0b58

@ -48,4 +48,14 @@ public class Ret {
public static Ret fail(String reason) {
return new Ret("fail", reason);
}
@Override
public String toString() {
return "Ret{" +
"result='" + result + '\'' +
", reason='" + reason + '\'' +
", code='" + code + '\'' +
", data=" + data +
'}';
}
}

@ -56,11 +56,11 @@ public abstract class TaskBaseService implements TaskHandler {
log.info("开始执行任务");
log.info("start get one task");
StrObjMap getTaskRet = getTask();
Object getTaskRet = getTask();
log.info("end get one task");
log.info("start process one task");
StrObjMap processTaskRet = processTask(getTaskRet);
Object processTaskRet = processTask(getTaskRet);
log.info("end process task");
log.info("start clear one task");

@ -2,11 +2,11 @@ package com.ms.api.common;
public interface TaskHandler {
StrObjMap getTask();
Object getTask();
StrObjMap processTask(StrObjMap params);
Object processTask(Object params);
void clearTask(StrObjMap params);
void clearTask(Object params);
}

@ -0,0 +1,28 @@
package com.ms.api.spi.timer;
import com.jinritemai.cloud.base.api.BaseRequest;
import com.jinritemai.cloud.base.api.BaseResponse;
import com.jinritemai.cloud.base.api.ExtensionService;
import com.jinritemai.cloud.base.api.ExtensionServiceHandler;
import com.ms.api.common.R;
import com.ms.api.common.Ret;
import com.ms.api.common.TimerBaseService;
import com.ms.api.dto.ItemDTO;
import com.ms.api.task.MoveMaterialAuditBufferTaskService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ExtensionService("moveMaterialAuditBufferTimer")
@Slf4j
public class MoveMaterialAuditBufferTimerService extends TimerBaseService implements ExtensionServiceHandler<ItemDTO, Ret> {
@Autowired
MoveMaterialAuditBufferTaskService moveMaterialAuditBufferTaskService;
@Override
public BaseResponse<Ret> handle(BaseRequest<ItemDTO> req) {
moveMaterialAuditBufferTaskService.runTask();
return R.ok(Ret.success());
}
}

@ -0,0 +1,28 @@
package com.ms.api.spi.timer;
import com.jinritemai.cloud.base.api.BaseRequest;
import com.jinritemai.cloud.base.api.BaseResponse;
import com.jinritemai.cloud.base.api.ExtensionService;
import com.jinritemai.cloud.base.api.ExtensionServiceHandler;
import com.ms.api.common.R;
import com.ms.api.common.Ret;
import com.ms.api.common.TimerBaseService;
import com.ms.api.dto.ItemDTO;
import com.ms.api.task.MoveMaterialAuditProcessTaskService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ExtensionService("moveMaterialAuditProcessTimer")
@Slf4j
public class MoveMaterialAuditProcessTimerService extends TimerBaseService implements ExtensionServiceHandler<ItemDTO, Ret> {
@Autowired
MoveMaterialAuditProcessTaskService moveMaterialAuditProcessTaskService;
@Override
public BaseResponse<Ret> handle(BaseRequest<ItemDTO> req) {
moveMaterialAuditProcessTaskService.runTask();
return R.ok(Ret.success());
}
}

@ -0,0 +1,28 @@
package com.ms.api.spi.timer;
import com.jinritemai.cloud.base.api.BaseRequest;
import com.jinritemai.cloud.base.api.BaseResponse;
import com.jinritemai.cloud.base.api.ExtensionService;
import com.jinritemai.cloud.base.api.ExtensionServiceHandler;
import com.ms.api.common.R;
import com.ms.api.common.Ret;
import com.ms.api.common.TimerBaseService;
import com.ms.api.dto.ItemDTO;
import com.ms.api.task.MovePublishBufferTaskService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ExtensionService("movePublishBufferTimer")
@Slf4j
public class MovePublishBufferTimerService extends TimerBaseService implements ExtensionServiceHandler<ItemDTO, Ret> {
@Autowired
MovePublishBufferTaskService movePublishBufferTaskService;
@Override
public BaseResponse<Ret> handle(BaseRequest<ItemDTO> req) {
movePublishBufferTaskService.runTask();
return R.ok(Ret.success());
}
}

@ -0,0 +1,28 @@
package com.ms.api.spi.timer;
import com.jinritemai.cloud.base.api.BaseRequest;
import com.jinritemai.cloud.base.api.BaseResponse;
import com.jinritemai.cloud.base.api.ExtensionService;
import com.jinritemai.cloud.base.api.ExtensionServiceHandler;
import com.ms.api.common.R;
import com.ms.api.common.Ret;
import com.ms.api.common.TimerBaseService;
import com.ms.api.dto.ItemDTO;
import com.ms.api.task.ProcessProductPublishTaskService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ExtensionService("proccessProductPublishTimer")
@Slf4j
public class ProccessProductPublishTimerService extends TimerBaseService implements ExtensionServiceHandler<ItemDTO, Ret> {
@Autowired
ProcessProductPublishTaskService proccessProductPublishTaskService;
@Override
public BaseResponse<Ret> handle(BaseRequest<ItemDTO> req) {
proccessProductPublishTaskService.runTask();
return R.ok(Ret.success());
}
}

@ -0,0 +1,102 @@
package com.ms.api.task;
import com.ms.api.common.Ret;
import com.ms.api.common.StrObjMap;
import com.ms.api.common.TaskBaseService;
import com.ms.api.tool.CommonTool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
/**
* material_audit_buffer
*/
@Configuration
@Component
@Slf4j
public class MoveMaterialAuditBufferTaskService extends TaskBaseService {
/**
*
*/
public int getCorePoolSiz() {
return 3;
}
/**
*
*/
public String getTaskExecutorName() {
return "moveMaterialAuditBufferPool";
}
@Bean(name = "moveMaterialAuditBufferPool")
@Override
public Executor getAsyncExecutor() {
return super.getAsyncExecutor();
}
@Resource(name = "moveMaterialAuditBufferPool")
protected Executor taskPool;
@Override
protected Executor getTaskPool() {
return taskPool;
}
//@Async("moveMaterialAuditBufferPool")
@Override
public void runTask() {
super.runTask();
}
/**
*
*/
@Override
public Object getTask() {
// 循环操作
// 判断queue是否满了
// $queueCnt = $this->db->queryFirstField('select count(1) as cnt from material_audit_status_queue');
// if ($queueCnt > MaterialConst::MaterialAuditStatusQueueMaxCnt) {
// $this->materialAuditStatusBufferToQueueLog->info('queueCnt too large ,end bufferToQueue:' . $queueCnt);
// break;
// }
// 一条条插入到queue中
// 删除buffer中的数据
// 如果取出来的数据小余100条说明buffer中的数据已经取完了结束循环
return null;
}
/**
*
*
* @param params
* @return
*/
@Override
public Object processTask(Object params) {
return null;
}
/**
*
*
* @param params
*/
@Override
public void clearTask(Object params) {
}
}

@ -0,0 +1,103 @@
package com.ms.api.task;
import com.ms.api.common.TaskBaseService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.concurrent.Executor;
/**
* queue
*/
@Configuration
@Component
@Slf4j
public class MoveMaterialAuditProcessTaskService extends TaskBaseService {
/**
*
*/
public int getCorePoolSiz() {
return 3;
}
/**
*
*/
public String getTaskExecutorName() {
return "moveMaterialAuditProcessPool";
}
@Bean(name = "moveMaterialAuditProcessPool")
@Override
public Executor getAsyncExecutor() {
return super.getAsyncExecutor();
}
@Resource(name = "moveMaterialAuditProcessPool")
protected Executor taskPool;
@Override
protected Executor getTaskPool() {
return taskPool;
}
//@Async("moveMaterialAuditProcessPool")
@Override
public void runTask() {
super.runTask();
}
/**
*
*/
@Override
public Object getTask() {
// 循环处理
// 取一条出来锁,锁住了就循环
return null;
}
/**
*
*
* @param params
* @return
*/
@Override
public Object processTask(Object params) {
// // 获取token 如果获取失败,记录日志
//
// // 获取审核日志
// getShopMaterialBizAuditStatusLog();
//
// // 如果已已经处理,记录日志
//
// // 从抖店获取素材详情,如果失败,记录原因
// getMaterialQueryMaterialDetailFromDd();
//
// // 保存抖店的url到日志表
return null;
}
/**
*
*
* @param params
*/
@Override
public void clearTask(Object params) {
// // 如果失败把队列里面的queue移回到buffer
// moveMaterialAuditStatusQueueToBuffer();
//
// // 如果成功,删除队列里面的任务
//// $this->db->delete('material_audit_status_queue', 'material_audit_status_queue_id = %i', $queueMsg['material_audit_status_queue_id']);
}
}

@ -0,0 +1,171 @@
package com.ms.api.task;
import com.ms.api.common.Ret;
import com.ms.api.common.StrObjMap;
import com.ms.api.common.TaskBaseService;
import com.ms.api.tool.CommonTool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
/**
* move_product_publish_buffer
* move_product_publish_queue
*/
@Configuration
@Component
@Slf4j
public class MovePublishBufferTaskService extends TaskBaseService {
/**
*
*/
public int getCorePoolSiz() {
return 3;
}
/**
*
*/
public String getTaskExecutorName() {
return "movePublishBufferTaskPool";
}
@Bean(name = "movePublishBufferTaskPool")
@Override
public Executor getAsyncExecutor() {
return super.getAsyncExecutor();
}
@Resource(name = "movePublishBufferTaskPool")
protected Executor taskPool;
@Override
protected Executor getTaskPool() {
return taskPool;
}
//@Async("movePublishBufferTaskPool")
@Override
public void runTask() {
super.runTask();
}
/**
*
*/
@Override
public Object getTask() {
return getMoveProductPublishBufferShopMap(500);
}
/**
*
*
* @param params
* @return
*/
@Override
public Object processTask(Object params) {
StrObjMap selectedShopMap = (StrObjMap)params;
log.info("start moveProductPublishBufferToQueue");
Ret ret = moveProductPublishBufferToQueue(selectedShopMap);
log.info("end moveProductPublishBufferToQueue ret: " + ret.toString());
return ret;
}
/**
*
*
* @param params
*/
@Override
public void clearTask(Object params) {
}
private StrObjMap getMoveProductPublishBufferShopMap(int maxQueueCount) {
log.info("handle getMoveProductPublishBufferShopMap");
int queueCount = 100; // TODO 查sql获取队列数量
if (queueCount >= maxQueueCount) {
log.info("queueCount " + queueCount + ", maxQueueCount " + maxQueueCount + ", end.");
return null;
}
int leftCount = maxQueueCount - queueCount;
log.info("queueCount " + queueCount + ", maxQueueCount " + maxQueueCount + ", leftCount " + leftCount);
StrObjMap selectedShopMap = countSelectedProductPublishBufferShopMap(leftCount);
log.info("end getMoveProductPublishBufferSelectedShopMap");
return selectedShopMap;
}
private StrObjMap countSelectedProductPublishBufferShopMap(int leftCount){
log.info("handle countSelectedProductPublishBufferShopMap");
StrObjMap ret = statProductPublishBuffer();
StrObjMap priorityShopsMap = (StrObjMap)ret.get("priorityShopsMap");
List<Long> shopIds = (List<Long>)ret.get("shopIds");
log.info("end countSelectedProductPublishBufferShopMap");
// $stopMoveShopIds = $this->db->queryFirstColumn('select shop_id from move_shop_stop where is_limit_publish = 1');
List<Long> stopMoveShopIds = new ArrayList<>(); // TODO 查上面sql获取
shopIds = CommonTool.arrayDiff(shopIds, stopMoveShopIds);
// foreach ($priorityShopsMap as $priority => $shopIdAndCountMap) {
// foreach ($shopIdAndCountMap as $shopId => $count) {
// if (in_array($shopId, $stopMoveShopIds)) {
// unset($priorityShopsMap[$priority][$shopId]);
// }
// }
// }
StrObjMap shopLimitQuotaArray = getShopLimitQuota(shopIds);
log.info("start allocProductPublishQuota");
StrObjMap selectedShopMap = allocProductPublishQuota(leftCount, priorityShopsMap, shopLimitQuotaArray);
log.info("end allocProductPublishQuota");
return selectedShopMap;
}
private StrObjMap statProductPublishBuffer(){
return null;
}
private StrObjMap getShopLimitQuota(List<Long> shopIds){
return null;
}
private StrObjMap allocProductPublishQuota(int leftCount, StrObjMap priorityShopsMap, StrObjMap shopLimitQuotaArray) {
return null;
}
// Buffer移到Queue中
private Ret moveProductPublishBufferToQueue(StrObjMap selectedShopMap) {
StrObjMap bufferRowList = new StrObjMap();
Ret addRet = addPublishQueueBatch(bufferRowList);
//$bufferIds = ZcArrayHelper::getSub($bufferRowList, 'move_product_publish_buffer_id');
List bufferIds = new ArrayList<>(); // TODO
if (CommonTool.isSuccessRet(addRet)){
log.info("start delete move_product_publish_buffer");
//$delRet = $this->db->delete('move_product_publish_buffer', 'move_product_publish_buffer_id in %li', $bufferIds);
Ret delRet = new Ret(); // TODO
log.info("end delete move_product_publish_buffer delRet["+delRet+"] bufferIds total:" + bufferIds.size());
}
// 加到redis队列
// $redisTotal = 0;
// foreach ($bufferRowList as $row) {
// $redisTotal = $this->addPublishQueueToRedis($row['shop_id'], $row['priority'], $row['move_collect_task_detail_id']);
// }
return null;
}
private Ret addPublishQueueBatch(StrObjMap bufferRowList) {
return null;
}
}

@ -0,0 +1,205 @@
package com.ms.api.task;
import com.ms.api.common.StrObjMap;
import com.ms.api.common.TaskBaseService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Objects;
import java.util.concurrent.Executor;
/**
* move_product_publish_queue
*/
@Configuration
@Component
@Slf4j
public class ProcessProductPublishTaskService extends TaskBaseService {
/**
*
*/
public int getCorePoolSiz() {
return 3;
}
/**
*
*/
public String getTaskExecutorName() {
return "proccessProductPublishTaskPool";
}
@Bean(name = "proccessProductPublishTaskPool")
@Override
public Executor getAsyncExecutor() {
return super.getAsyncExecutor();
}
@Resource(name = "proccessProductPublishTaskPool")
protected Executor taskPool;
@Override
protected Executor getTaskPool() {
return taskPool;
}
//@Async("proccessProductPublishTaskPool")
@Override
public void runTask() {
super.runTask();
}
/**
*
*/
@Override
public Object getTask() {
log.info("start getProductPublishQueue");
log.info("lockPublishTaskQueue start lockPublishTask");
// 锁定任务并获取一条任务
StrObjMap queueMsg = lockProductPublishQueue();
log.info("lockProductPublishQueue > lockPublishQueue");
if (Objects.isNull(queueMsg)) {
log.info("lockPublishTaskQueue end lockPublishTask");
log.info("locked empty, wait");
return queueMsg;
}
// 更新move_task_detail
// $r = $this->moveService->updateMoveCollectTaskDetail(array (
// 'publish_hostname' => $queueMsg['publish_hostname'] . $hostname,
// 'publish_wait_seconds' => $this->startTime - strtotime($queueMsg['gmt_create'])
// ), $queueMsg['shop_id'], $queueMsg['move_collect_task_detail_id']);
return null;
}
/**
*
*
* @param params
* @return
*/
@Override
public Object processTask(Object params) {
// // 获取源产品数据
// getSourceProductDataByType(queueMsg);
//// $sourceProductDataRet = $this->moveUtil->getSourceProductDataByType($queueMsg);
//// if (CommonTool::isFailRet($sourceProductDataRet)) {
//// return $sourceProductDataRet;
//// }
//// $sourceProductData = $sourceProductDataRet['sourceProductData'];
//
// // 补充数据
// replenishMoveCollectDetailData();
//
// // 检查Cid
// checkTaskDetailCidIsValid();
//
// log.info("start processProductPublishQueue queueMsg");
// // SKU预售类型, 这个需要吗?
// buildSkuPresellTypeBySource();
//
// // 准备搬家发布
// processMoveProductQueuePre();
//
// // 抖店那边先关的请求
// getShopAccessToken();
// buildCreateProductV2Data();
//
// // 图片处理 这边会加入到素材的buffer
// processAndUploadMoveProductImgs
//
// // 信息保存到oss上
// uploadMoveDataToOss();
return null;
}
/**
*
*
* @param params
*/
@Override
public void clearTask(Object params) {
processMovePublishQueueRet();
// 前面已经挂起这边删除que
deleteQueueMsg();
}
private void processMovePublishQueueRet() {
// // 要搬家的所有图片
// getMovePublishAllImgs();
//
// // 图片素材先关
// getUrlAndMaterialIdMapBySourceUrl();
//
// // 移动到pic_queue 挂起
// createMoveProductPublishToPicQueue();
//
// // 保存素材业务信息
// // 素材的task和detail就加入了
// saveMaterialBiz();
//
// //else
// // 限流等情况的处理
// checkIsFrequencyLimitReason();
//
// // 更新搬家detail数据
// updateMoveCollectTaskDetail();
}
private void createMoveProductPublishToPicQueue() {
}
private void saveMaterialBiz() {
}
private void deleteQueueMsg() {
}
private StrObjMap lockProductPublishQueue() {
// 从redis中获取一条任务 是buffer 这是用到是task_detail的id
// $taskDetailId = $this->solidRedis->rPop($moveProductPublishQueueName);
// if (empty($taskDetailId)) {
// $this->productPublishLockRedisLog->info("timerLockId [$timerLockId] move_product_publish_queueName[$moveProductPublishQueueName] taskDetailId $taskDetailId getEmpty");
// return array();
// }
// 锁住public_queue的一条数据, 通过task_detail_id 来找到
// $affectRows = $this->db->update('move_product_publish_queue', array (
// 'locked' => $timerLockId,
// 'gmt_locked' => ZcDbEval::now(),
// 'gmt_modified' => ZcDbEval::now()
// ), "move_collect_task_detail_id = %i AND locked = 0", $taskDetailId);
// if ($affectRows != 1) {
// $this->productPublishLockRedisLog->info("timerLockId [$timerLockId] taskDetailId $taskDetailId move_product_publish_queueName[$moveProductPublishQueueName] locked empty");
// return array();
// }
// 获取锁住的数据
// $lockMsg = $this->db->queryFirstRow("select * from `move_product_publish_queue` where move_collect_task_detail_id = %i", $taskDetailId);
// if (empty($lockMsg)) {
// return $lockMsg;
// }
// 组装EXT信息
buildMoveTaskDetailInfoToQueue();
return null;
}
private void buildMoveTaskDetailInfoToQueue() {
// 组装很多信息, 这个可以直接问下, EXT表
}
}

@ -6,14 +6,12 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.concurrent.Executor;
@Configuration
@EnableAsync
@Component
@Slf4j
public class TestTaskService extends TaskBaseService {
@ -53,16 +51,16 @@ public class TestTaskService extends TaskBaseService {
}
@Override
public StrObjMap getTask() {
public Object getTask() {
log.info("handle getTask");
return null;
}
@Override
public StrObjMap processTask(StrObjMap params) {
public Object processTask(Object params) {
log.info("handle processTask");
try {
Thread.sleep(1000 * 60);
Thread.sleep(1000 * 60 * 5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
@ -70,7 +68,7 @@ public class TestTaskService extends TaskBaseService {
}
@Override
public void clearTask(StrObjMap params) {
public void clearTask(Object params) {
log.info("handle clearTask");
}

@ -310,6 +310,13 @@ public class CommonTool {
.filter(s -> !s.isEmpty())
.collect(Collectors.toList());
}
public static List<Long> arrayDiff(List<Long> list1, List<Long> list2) {
List<Long> diff = new ArrayList<>(list1);
diff.removeAll(list2);
return diff;
}
public static void main(String[] args) {
System.out.println(splitWithComma("6921394317136631525|6921399584322230185|6921393247513351270|6921412710844995353"));

Loading…
Cancel
Save