wangchaoxu 1 year ago
commit 9f3516ace6

@ -1,10 +1,12 @@
package com.ms.api.common;
import com.jinritemai.cloud.base.core.util.LogUtils;
import com.ms.api.tool.CommonTool;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.UUID;
import java.util.concurrent.Executor;
@Slf4j
@ -29,12 +31,17 @@ public abstract class TaskBaseService implements TaskHandler {
executor.setCorePoolSize(getCorePoolSiz() + 1);
executor.setMaxPoolSize(getMaxPoolSize());
executor.setQueueCapacity(getQueueCapacity());
executor.setThreadNamePrefix(getTaskExecutorName());
// 增加随机数,在分布式环境下,区分不同的线程池和线程,以进行分布式锁的使用
executor.setThreadNamePrefix(getTaskExecutorName() + UUID.randomUUID());
executor.initialize();
return executor;
}
public void runTask() {
Ret ret = lockTask();
if(CommonTool.isFailRet(ret)){
return;
}
try {
setLogId();
if (isPoolFull()) {
@ -45,9 +52,18 @@ public abstract class TaskBaseService implements TaskHandler {
log.info(e.getMessage());
} catch (Throwable e) {
log.error("任务异常退出, 任务池 " + getTaskExecutorName(), e);
}finally {
unlockTask(ret);
}
}
protected Ret lockTask() {
return CommonTool.successResult();
}
protected void unlockTask(Ret ret) {
}
private void setLogId() {
LogUtils.setLogId(getLogId() + System.currentTimeMillis());
}

@ -8,6 +8,17 @@ import java.util.Date;
import cn.hutool.core.util.ObjectUtil;
public class RedisKeyConst {
public static final String MOVE_PRODUCT_PUBLISH_BUFFER_LOCK_KEY = "move_product_publish_buffer_lock_key";
public static final String MOVE_MATERIAL_AUDIT_BUFFER_TASK_LOCK_KEY = "move_material_audit_buffer_task_lock_key";
public static final String MOVE_UNLOCK_TASK_LOCK_KEY = "move_unlock_task_lock_key";
public static final String MOVE_REPLENISH_TASK_LOCK_KEY = "move_replenish_task_lock_key";
public static final String MOVE_CHECK_AUDIT_STATUS_TIMEOUT_TASK_LOCK_KEY = "move_check_audit_status_timeout_task_lock_key";
public static final String PRODUCT_GET_UNAUTHORIZED = "product_get_unauthorized";
public static final String MOVE_PRODUCT_PUBLISH_TO_PIC_QUEUE = "move_product_publish_to_pic_queue";

@ -5,6 +5,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
/**
* redisFIFO
*/
@ -62,4 +64,44 @@ public class RedisService {
return result;
}
/**
*
*
* @param key key
* @param value value
* @param timeout
* @return
*/
public boolean tryLock(String key, String value, long timeout) {
// 使用setIfAbsent (对应于Redis的SETNX命令)来尝试设置值,成功则表示获取锁,失败表示锁已存在
boolean lock = redisTemplate.opsForValue().setIfAbsent(key, value, timeout, TimeUnit.SECONDS);
if (!lock) {
// 锁已存在,获取锁的值(当前持有锁的线程的标识)并检查是否与当前线程的标识相同
String currentValue = redisTemplate.opsForValue().get(key);
// 如果锁已过期
if (currentValue != null && currentValue.equals(value)) {
return true;
}
}
return lock;
}
/**
*
*
* @param key key
* @param value value
*/
public void unlock(String key, String value) {
try {
String currentValue = redisTemplate.opsForValue().get(key);
if (currentValue != null && currentValue.equals(value)) {
redisTemplate.delete(key);
}
} catch (Exception e) {
// 打印异常
e.printStackTrace();
}
}
}

@ -1,52 +1,20 @@
package com.ms.api.spi.move;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.doudian.open.api.product_GetRecommendCategory.ProductGetRecommendCategoryRequest;
import com.doudian.open.api.product_GetRecommendCategory.ProductGetRecommendCategoryResponse;
import com.doudian.open.api.product_GetRecommendCategory.data.CategoryDetailsItem;
import com.doudian.open.api.product_GetRecommendCategory.param.ProductGetRecommendCategoryParam;
import com.doudian.open.api.shop_getShopCategory.ShopGetShopCategoryRequest;
import com.doudian.open.api.shop_getShopCategory.ShopGetShopCategoryResponse;
import com.doudian.open.api.shop_getShopCategory.param.ShopGetShopCategoryParam;
import com.jinritemai.cloud.base.api.BaseRequest;
import com.jinritemai.cloud.base.api.BaseResponse;
import com.jinritemai.cloud.base.api.ExtensionService;
import com.jinritemai.cloud.base.api.ExtensionServiceHandler;
import com.ms.api.biz.MoveService;
import com.ms.api.bo.MoveShopSettingBO;
import com.ms.api.common.R;
import com.ms.api.common.Ret;
import com.ms.api.common.SPIBaseService;
import com.ms.api.consts.MoveConst;
import com.ms.api.common.StrObjMap;
import com.ms.api.dto.move.GetProductInfoRequestDTO;
import com.ms.api.service.CategoryService;
import com.ms.api.service.CategoryShopService;
import com.ms.api.service.MoveShopSettingService;
import com.ms.api.service.MoveSystemSourceCategoryService;
import com.ms.api.tool.DsJsonRequestTemplate;
import com.ms.dal.entity.Category;
import com.ms.dal.entity.CategoryShop;
import com.ms.dal.entity.MoveShopSetting;
import com.ms.dal.entity.MoveSystemSourceCategory;
import com.ms.api.task.MoveToSecondTaskService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.modelmapper.ModelMapper;
import org.modelmapper.convention.MatchingStrategies;
import org.phprpc.util.AssocArray;
import org.phprpc.util.PHPSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import java.lang.annotation.Retention;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* 1688
* <p>
*
* idid1688
*
@ -54,208 +22,25 @@ import java.util.regex.Pattern;
@ExtensionService("getProductInfoByProductId")
@Slf4j
public class GetProductInfoByProductId extends SPIBaseService implements ExtensionServiceHandler<GetProductInfoRequestDTO, Ret> {
@Autowired
private DsJsonRequestTemplate dsJsonRequestTemplate;
@Autowired
private MoveSystemSourceCategoryService moveSystemSourceCategoryService;
@Autowired
private CategoryService categoryService;
@Autowired
private CategoryShopService categoryShopService;
@Autowired
private MoveService moveService;
@Autowired
private MoveShopSettingService moveShopSettingService;
private MoveToSecondTaskService moveToSecondTaskService;
@Override
public BaseResponse<Ret> handle(BaseRequest<GetProductInfoRequestDTO> req) {
// ----逻辑校验----
// ----业务处理----
initHandle(req);
getAuthCode();
GetProductInfoRequestDTO fields = req.getData();
List<String> productIds = fields.getProductIds();
int limitNum = 10;
if (productIds.size() > limitNum) {
return R.ok(Ret.fail("最多支持"+limitNum+"个商品"));
}
List<Object> productInfo = new ArrayList<>();
List<String> fetchFailIds = new ArrayList<>();
for (int i = 0; i < productIds.size(); i++) {
HashMap<String, Object> params = new HashMap<>();
params.put("productId", productIds.get(i));
params.put("authCode", authCode);
String res = null;
JSONObject resObj = null;
try {
res = dsJsonRequestTemplate.execute("/micro_move/get_product_info", params);
resObj = JSON.parseObject(res);
JSONObject product = resObj.getJSONObject("productInfo");
Long sourceCategoryId = product.getLong("1688cid");
// 处理类目
String sourceCateListStr = StringUtils.join(product.getJSONArray("cateList"), ">");
log.info("productInfo:::" + sourceCategoryId.toString());
MoveSystemSourceCategory moveSystemSourceCategory = moveSystemSourceCategoryService.selectBySourceCategoryId(sourceCategoryId.toString());
if (!ObjectUtil.isEmpty(moveSystemSourceCategory)) {
PHPSerializer p = new PHPSerializer();
String content = moveSystemSourceCategory.getMatchCategoryList();
AssocArray assocArray = (AssocArray) p.unserialize(content.getBytes());
Map<String, Object> assocMap = assocArrayToHash(assocArray);
Long categoryId = this.getBestCategoryId(sourceCateListStr, assocMap);
if (ObjectUtil.isEmpty(categoryId)) {
product.put("ddCid", "");
product.put("ddCategoryList", "");
} else {
List<Long> categoryIds = this.getCategoryIdList(categoryId);
Category category = categoryService.selectByPrimaryKey(categoryId);
String path = category.getPath();
List<String> pathArr = Arrays.asList(path.split(">"));
product.put("ddCid", categoryIds);
product.put("ddCategoryList", pathArr);
}
} else {
ProductGetRecommendCategoryRequest request = new ProductGetRecommendCategoryRequest();
ProductGetRecommendCategoryParam param = request.getParam();
param.setScene("category_infer");
param.setName(product.getString("title"));
ProductGetRecommendCategoryResponse response = request.execute();
List<CategoryDetailsItem> categoryDetails = response.getData().getCategoryDetails();
if (!ObjectUtil.isEmpty(categoryDetails) && categoryDetails.size() > 0) {
log.info(categoryDetails.get(0).toString());
CategoryDetailsItem categoryDetailsItem = categoryDetails.get(0);
List<Long> categoryIds = new ArrayList<>();
List<String> pathArr = new ArrayList<>();
if (categoryDetailsItem.getCategoryDetail().getFirstCid() != 0) {
categoryIds.add(categoryDetailsItem.getCategoryDetail().getFirstCid());
}
if (categoryDetailsItem.getCategoryDetail().getSecondCid() != 0) {
categoryIds.add(categoryDetailsItem.getCategoryDetail().getSecondCid());
}
if (categoryDetailsItem.getCategoryDetail().getFirstCid() != 0) {
categoryIds.add(categoryDetailsItem.getCategoryDetail().getFirstCid());
}
if (categoryDetailsItem.getCategoryDetail().getFourthCid() != 0) {
categoryIds.add(categoryDetailsItem.getCategoryDetail().getFourthCid());
}
if (!categoryDetailsItem.getCategoryDetail().getFirstCname().isEmpty()) {
pathArr.add(categoryDetailsItem.getCategoryDetail().getFirstCname());
}
if (!categoryDetailsItem.getCategoryDetail().getSecondCname().isEmpty()) {
pathArr.add(categoryDetailsItem.getCategoryDetail().getSecondCname());
}
if (!categoryDetailsItem.getCategoryDetail().getThirdCname().isEmpty()) {
pathArr.add(categoryDetailsItem.getCategoryDetail().getThirdCname());
}
if (!categoryDetailsItem.getCategoryDetail().getFourthCname().isEmpty()) {
pathArr.add(categoryDetailsItem.getCategoryDetail().getFourthCname());
}
ShopGetShopCategoryRequest shopRequest = new ShopGetShopCategoryRequest();
ShopGetShopCategoryParam shopParam = shopRequest.getParam();
shopParam.setCid(categoryIds.get(categoryIds.size() - 2));
ShopGetShopCategoryResponse shopReponse = shopRequest.execute();
if (!ObjectUtil.isEmpty(shopReponse.getData())) {
product.put("ddCid", categoryIds);
product.put("ddCategoryList", pathArr);
} else {
product.put("ddCid", "");
product.put("ddCategoryList", "");
}
} else {
product.put("ddCid", "");
product.put("ddCategoryList", "");
}
}
// 处理价格
Double price = product.getDoubleValue("jd_price");
JSONObject skuMap = product.getJSONObject("skuMap");
Double consignPrice = 0D;
for (String key : skuMap.keySet()) {
JSONObject item = JSONObject.parseObject(skuMap.get(key).toString());
Double cPrice = item.getDoubleValue("consignPrice");
if (cPrice > consignPrice) {
consignPrice = cPrice;
}
}
if (Objects.isNull(consignPrice)) {
consignPrice = 0D;
}
MoveShopSetting moveShopSetting = moveShopSettingService.getDetailByShopId(shopId);
ModelMapper modelMapper = new ModelMapper();
modelMapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
MoveShopSettingBO condition = modelMapper.map(moveShopSetting, MoveShopSettingBO.class);
if (condition.getAliPriceType().equals(MoveConst.ALI_PRICE_TYPE_CONSIGN) && consignPrice > 0) {
price = consignPrice;
}
double mPrice = moveService.processConditionPrice(condition, price);
product.put("price", mPrice);
productInfo.add(resObj.getJSONObject("productInfo"));
log.info(res);
} catch (Exception e) {
fetchFailIds.add(productIds.get(i));
e.printStackTrace();
log.error(e.toString());
}
}
// ----逻辑校验----
Map<String, Object> result = new HashMap<>();
result.put("productList", productInfo);
result.put("fetchFailIds", fetchFailIds);
// ----业务处理----
StrObjMap params = new StrObjMap();
params.put("productIds", req.getData().getProductIds());
params.put("shopId", shopId);
params.put("authCode", authCode);
// 并发执行
Ret ret = moveToSecondTaskService.runParallelTask(params);
// ----结果返回----
return R.ok(Ret.success(result));
}
private List<Long> getCategoryIdList(Long categoryId) {
List<Long> categoryIdList = new ArrayList<>();
while (categoryId != 0) {
Category category = categoryService.selectByPrimaryKey(categoryId);
categoryIdList.add(categoryId);
categoryId = Long.valueOf(category.getParentCategoryId());
}
Collections.reverse(categoryIdList);
return categoryIdList;
}
private static Map assocArrayToHash(AssocArray assocArray) {
HashMap hashMap = assocArray.toHashMap();
Map result = new HashMap();
for (Object key : hashMap.keySet()) {
if (hashMap.get(key) instanceof AssocArray) {
result.put(key.toString(), assocArrayToHash((AssocArray) hashMap.get(key)));
} else if (hashMap.get(key) instanceof byte[]) {
result.put(key.toString(), new String((byte[]) hashMap.get(key)));
} else {
result.put(key.toString(), hashMap.get(key));
}
}
log.info(result + "result");
return result;
}
private Long getBestCategoryId(String sourceCateListStr, Map assocMap) {
Double max = 0.00;
Long categoryId = null;
for (Object key : assocMap.keySet()) {
System.out.println("key:" + key + ", value:" + assocMap.get(key));
Category category = categoryShopService.getCategoryListByCategoryId(shopId, (Long.valueOf(key.toString())));
if (!ObjectUtil.isEmpty(category)) {
Double score = this.getJaroWinklerDistance(sourceCateListStr, category.getPath());
if (score > max) {
max = score;
categoryId = Long.valueOf(key.toString());
}
}
}
return categoryId;
}
private double getJaroWinklerDistance(String first, String second) {
return StringUtils.getJaroWinklerDistance(first, second);
return R.ok(ret);
}
}

@ -79,14 +79,11 @@ public class GetProductInfoService extends SPIBaseService implements ExtensionSe
getAuthCode();
// ----参数校验----
GetProductInfoRequestDTO fields = req.getData();
// List<String> sourceItemUrls = new ArrayList<>();
List<String> sourceItemUrls = fields.getSourceItemUrls();
int limitNum = 10;
int limitNum = 100;
if (sourceItemUrls.size() > limitNum) {
return R.ok(Ret.fail("最多支持 " +limitNum+"个商品"));
}
// sourceItemUrls.add("https://detail.1688.com/offer/682805686164.html?spm=a2638t.b_90254924.bangdan-list.3.3872436cl59VWf&cosite=baidujj_pz&tracelog=p4p&_p_isad=1&clickid=3906df8adbe74c3faba6eefc496fa074&sessionid=3fa85d5fd57096560a3cfa71088c6e4a");
// sourceItemUrls.add("https://detail.1688.com/offer/682805686164.html?spm=a2638t.b_90254924.bangdan-list.3.3872436cl59VWf&cosite=baidujj_pz&tracelog=p4p&_p_isad=1&clickid=3906df8adbe74c3faba6eefc496fa074&sessionid=3fa85d5fd57096560a3cfa71088c6e4a");
List<Object> productInfo = new ArrayList<>();
List<String> fetchFailUrls = new ArrayList<>();
for (int i = 0; i < sourceItemUrls.size(); i++) {

@ -1,9 +1,13 @@
package com.ms.api.task;
import com.ms.api.common.Ret;
import com.ms.api.common.TaskBaseService;
import com.ms.api.consts.RedisKeyConst;
import com.ms.api.consts.StatusConst;
import com.ms.api.paas.RedisService;
import com.ms.api.service.MaterialAuditStatusBufferService;
import com.ms.api.service.MaterialBizAuditStatusLogService;
import com.ms.api.tool.CommonTool;
import com.ms.dal.entity.MaterialAuditStatusBuffer;
import com.ms.dal.entity.MaterialBizAuditStatusLog;
import lombok.extern.slf4j.Slf4j;
@ -30,15 +34,15 @@ import java.util.stream.Collectors;
@Slf4j
public class CheckAuditStatusTimeoutTaskService extends TaskBaseService {
@Autowired
private PlatformTransactionManager transactionManager;
@Autowired
MaterialAuditStatusBufferService materialAuditStatusBufferService;
@Autowired
MaterialBizAuditStatusLogService materialBizAuditStatusLogService;
@Autowired
RedisService redisService;
/**
*
*/
@ -80,6 +84,22 @@ public class CheckAuditStatusTimeoutTaskService extends TaskBaseService {
super.runTask();
}
@Override
protected Ret lockTask() {
log.info("try lock checkAuditStatusTimeoutTask");
boolean isLock = redisService.tryLock(RedisKeyConst.MOVE_CHECK_AUDIT_STATUS_TIMEOUT_TASK_LOCK_KEY,Thread.currentThread().getName(),60 * 10);
if(!isLock){
return CommonTool.failResult("lock fail");
}
log.info("locked checkAuditStatusTimeoutTask");
return CommonTool.successResult();
}
@Override
protected void unlockTask(Ret ret) {
redisService.unlock(RedisKeyConst.MOVE_CHECK_AUDIT_STATUS_TIMEOUT_TASK_LOCK_KEY,Thread.currentThread().getName());
}
/**
*
*/

@ -1,11 +1,13 @@
package com.ms.api.task;
import com.ms.api.common.Ret;
import com.ms.api.common.TaskBaseService;
import com.ms.api.consts.CommonConst;
import com.ms.api.consts.RedisKeyConst;
import com.ms.api.paas.RedisService;
import com.ms.api.service.MaterialAuditStatusBufferService;
import com.ms.api.service.MaterialAuditStatusQueueService;
import com.ms.api.tool.CommonTool;
import com.ms.dal.entity.MaterialAuditStatusBuffer;
import com.ms.dal.entity.MaterialAuditStatusQueue;
import lombok.extern.slf4j.Slf4j;
@ -88,6 +90,22 @@ public class MoveMaterialAuditBufferTaskService extends TaskBaseService {
super.runTask();
}
@Override
protected Ret lockTask() {
log.info("try lock moveMaterialAuditBuffer");
boolean isLock = redisService.tryLock(RedisKeyConst.MOVE_MATERIAL_AUDIT_BUFFER_TASK_LOCK_KEY,Thread.currentThread().getName(),60 * 10);
if(!isLock){
return CommonTool.failResult("lock fail");
}
log.info("locked moveMaterialAuditBuffer");
return CommonTool.successResult();
}
@Override
protected void unlockTask(Ret ret) {
redisService.unlock(RedisKeyConst.MOVE_MATERIAL_AUDIT_BUFFER_TASK_LOCK_KEY,Thread.currentThread().getName());
}
/**
*
*/

@ -31,11 +31,11 @@ import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import javax.annotation.Resource;
import java.util.*;
import java.util.Arrays;
import java.util.Date;
import java.util.Objects;
import java.util.concurrent.Executor;
/**
@ -73,10 +73,14 @@ public class MoveMaterialAuditProcessTaskService extends TaskBaseService {
}
@Override
public boolean isCirculate() {return true;}
public boolean isCirculate() {
return true;
}
@Override
public int getIntervalTimeSecond() {return 1;}
public int getIntervalTimeSecond() {
return 1;
}
/**
*
@ -113,7 +117,7 @@ public class MoveMaterialAuditProcessTaskService extends TaskBaseService {
public Object getTask() {
log.info("start moveMaterialAuditProcess");
String taskDetailIdStr = redisService.pop(RedisKeyConst.MATERIAL_AUDIT_STATUS_QUEUE);
if (StrUtil.isBlank(taskDetailIdStr)){
if (StrUtil.isBlank(taskDetailIdStr)) {
E.throwMSException("no redis moveMaterialAuditQueue key");
}
long queueId = Long.parseLong(taskDetailIdStr);
@ -233,18 +237,14 @@ public class MoveMaterialAuditProcessTaskService extends TaskBaseService {
Ret processRet = (Ret) params;
MaterialAuditStatusQueue queueMsg = (MaterialAuditStatusQueue) processRet.getQueueMsg();
// TODO 事务测试
// DefaultTransactionDefinition def = new DefaultTransactionDefinition();
// TransactionStatus status = transactionManager.getTransaction(def);
// 数据库操作
try {
if (CommonTool.isFailRet(processRet) && !Objects.isNull(processRet.getCode()) && processRet.getCode().equals("waitAuditStatus")) {
moveMaterialAuditStatusQueueToBuffer(queueMsg);
}
// 删除queue
materialAuditStatusQueueService.deleteByPrimaryKey(queueMsg.getMaterialAuditStatusQueueId());
// transactionManager.commit(status);
} catch (Throwable e) {
// transactionManager.rollback(status);
log.info("任务执行失败 数据回滚: ", e);
}
}

@ -3,6 +3,7 @@ package com.ms.api.task;
import cn.hutool.core.util.ObjectUtil;
import com.ms.api.bo.MovePublishBufferRandShopIdAndPriorityBO;
import com.ms.api.common.E;
import com.ms.api.common.Ret;
import com.ms.api.common.StrObjMap;
import com.ms.api.common.TaskBaseService;
import com.ms.api.consts.RedisKeyConst;
@ -96,6 +97,22 @@ public class MovePublishBufferTaskService extends TaskBaseService {
super.runTask();
}
@Override
protected Ret lockTask() {
log.info("try lock movePublishBufferTask");
boolean isLock = redisService.tryLock(RedisKeyConst.MOVE_PRODUCT_PUBLISH_BUFFER_LOCK_KEY,Thread.currentThread().getName(),60 * 10);
if(!isLock){
return CommonTool.failResult("lock fail");
}
log.info("locked movePublishBufferTask");
return CommonTool.successResult();
}
@Override
protected void unlockTask(Ret ret) {
redisService.unlock(RedisKeyConst.MOVE_PRODUCT_PUBLISH_BUFFER_LOCK_KEY,Thread.currentThread().getName());
}
@Override
public Object getTask() {
log.info("start movePublishBufferTask");
@ -368,9 +385,7 @@ public class MovePublishBufferTaskService extends TaskBaseService {
// 分批把删除buffer中的数据放入到queue中同时放入到redis中
List<Map<Integer, MoveProductPublishBuffer>> chunkBufferRowList = CommonTool.chunkMap(allBufferRowList, 50);
for (Map<Integer, MoveProductPublishBuffer> bufferRowList : chunkBufferRowList) {
// TODO 事务测试
// DefaultTransactionDefinition def = new DefaultTransactionDefinition();
// TransactionStatus status = transactionManager.getTransaction(def);
// 数据库操作
try {
log.info("start addPublishQueueBatch move_product_publish_buffer bufferRowList total:" + bufferRowList.size());
int affRet = addPublishQueueBatch(bufferRowList);
@ -388,10 +403,8 @@ public class MovePublishBufferTaskService extends TaskBaseService {
redisTotal[0] += addPublishQueueToRedis(v.getPriority(), v.getMoveCollectTaskDetailId());
});
log.info("end addPublishQueueToRedis redis Queue Total[" + redisTotal[0] + "]");
// transactionManager.commit(status);
} catch (Exception e) {
shopBufferCountMap = null;
// transactionManager.rollback(status);
log.error("把删除buffer中的数据放入到queue中同时放入到redis中 任务执行失败 数据回滚: ", e);
}
}

@ -1,8 +1,12 @@
package com.ms.api.task;
import com.ms.api.biz.MoveService;
import com.ms.api.common.Ret;
import com.ms.api.common.TaskBaseService;
import com.ms.api.consts.RedisKeyConst;
import com.ms.api.paas.RedisService;
import com.ms.api.service.QueueService;
import com.ms.api.tool.CommonTool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
@ -28,6 +32,9 @@ public class MoveReplenishTaskService extends TaskBaseService {
@Autowired
QueueService unlockTimeoutQueue;
@Autowired
RedisService redisService;
/**
*
*/
@ -63,6 +70,22 @@ public class MoveReplenishTaskService extends TaskBaseService {
super.runTask();
}
@Override
protected Ret lockTask() {
log.info("try lock moveReplenishTask");
boolean isLock = redisService.tryLock(RedisKeyConst.MOVE_REPLENISH_TASK_LOCK_KEY,Thread.currentThread().getName(),60 * 10);
if(!isLock){
return CommonTool.failResult("lock fail");
}
log.info("locked moveReplenishTask");
return CommonTool.successResult();
}
@Override
protected void unlockTask(Ret ret) {
redisService.unlock(RedisKeyConst.MOVE_REPLENISH_TASK_LOCK_KEY,Thread.currentThread().getName());
}
/**
*
*/

@ -0,0 +1,341 @@
package com.ms.api.task;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.doudian.open.api.product_GetRecommendCategory.ProductGetRecommendCategoryRequest;
import com.doudian.open.api.product_GetRecommendCategory.ProductGetRecommendCategoryResponse;
import com.doudian.open.api.product_GetRecommendCategory.data.CategoryDetailsItem;
import com.doudian.open.api.product_GetRecommendCategory.param.ProductGetRecommendCategoryParam;
import com.doudian.open.api.shop_getShopCategory.ShopGetShopCategoryRequest;
import com.doudian.open.api.shop_getShopCategory.ShopGetShopCategoryResponse;
import com.doudian.open.api.shop_getShopCategory.param.ShopGetShopCategoryParam;
import com.jinritemai.cloud.base.core.util.AuthThreadLocalUtil;
import com.ms.api.biz.MoveService;
import com.ms.api.bo.MoveShopSettingBO;
import com.ms.api.common.Ret;
import com.ms.api.common.StrObjMap;
import com.ms.api.common.TaskBaseService;
import com.ms.api.consts.MoveConst;
import com.ms.api.service.CategoryService;
import com.ms.api.service.CategoryShopService;
import com.ms.api.service.MoveShopSettingService;
import com.ms.api.service.MoveSystemSourceCategoryService;
import com.ms.api.tool.CommonTool;
import com.ms.api.tool.DsJsonRequestTemplate;
import com.ms.dal.entity.Category;
import com.ms.dal.entity.MoveShopSetting;
import com.ms.dal.entity.MoveSystemSourceCategory;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.modelmapper.ModelMapper;
import org.modelmapper.convention.MatchingStrategies;
import org.phprpc.util.AssocArray;
import org.phprpc.util.PHPSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
/**
*
*
*/
@Configuration
@Component
@Slf4j
public class MoveToSecondTaskService extends TaskBaseService {
@Autowired
private DsJsonRequestTemplate dsJsonRequestTemplate;
@Autowired
private MoveSystemSourceCategoryService moveSystemSourceCategoryService;
@Autowired
private CategoryService categoryService;
@Autowired
private CategoryShopService categoryShopService;
@Autowired
private MoveService moveService;
@Autowired
private MoveShopSettingService moveShopSettingService;
/**
* ,
*/
@Override
public int getCorePoolSiz() {
return 50;
}
/**
* 线
*/
@Override
public int getMaxPoolSize() {
return 1000;
}
/**
* 5050线maxPoolSize
* 线
*/
@Override
public int getQueueCapacity() {
return 50;
}
/**
*
*/
public String getTaskExecutorName() {
return "moveToSecondTaskPool";
}
@Bean(name = "moveToSecondTaskPool")
@Override
public Executor getAsyncExecutor() {
return super.getAsyncExecutor();
}
@Resource(name = "moveToSecondTaskPool")
protected Executor taskPool;
@Override
protected Executor getTaskPool() {
return taskPool;
}
public Ret runParallelTask(Object params) {
if (ObjectUtil.isEmpty(params)) {
return CommonTool.failResult("参数不能为空");
}
StrObjMap fields = (StrObjMap) params;
List<String> productIds = (List<String>) fields.get("productIds");
Long shopId = (Long) fields.get("shopId");
String authCode = (String) fields.get("authCode");
int limitNum = 100;
if (productIds.size() > limitNum) {
return Ret.fail("最多支持" + limitNum + "个商品");
}
List<Object> productInfo = new ArrayList<>();
List<String> fetchFailIds = new ArrayList<>();
// 并发执行
ThreadPoolTaskExecutor taskExecutor = (ThreadPoolTaskExecutor) getTaskPool();
List<Future<JSONObject>> futures = new ArrayList<>();
for (int i = 0; i < productIds.size(); i++) {
int finalI = i;
futures.add(taskExecutor.submit(() -> {
log.info("当前任务运行在线程: " + Thread.currentThread().getName() + "-" + finalI);
AuthThreadLocalUtil.set(String.valueOf(shopId));
HashMap<String, Object> mParams = new HashMap<>();
mParams.put("productId", productIds.get(finalI));
mParams.put("authCode", authCode);
String res = null;
JSONObject resObj = null;
try {
res = dsJsonRequestTemplate.execute("/micro_move/get_product_info", mParams);
resObj = JSON.parseObject(res);
JSONObject product = resObj.getJSONObject("productInfo");
Long sourceCategoryId = product.getLong("1688cid");
// 处理类目
String sourceCateListStr = StringUtils.join(product.getJSONArray("cateList"), ">");
log.info("productInfo:::" + sourceCategoryId.toString());
MoveSystemSourceCategory moveSystemSourceCategory = moveSystemSourceCategoryService.selectBySourceCategoryId(sourceCategoryId.toString());
if (!ObjectUtil.isEmpty(moveSystemSourceCategory)) {
PHPSerializer p = new PHPSerializer();
String content = moveSystemSourceCategory.getMatchCategoryList();
AssocArray assocArray = (AssocArray) p.unserialize(content.getBytes());
Map<String, Object> assocMap = assocArrayToHash(assocArray);
Long categoryId = this.getBestCategoryId(sourceCateListStr, assocMap, shopId);
if (ObjectUtil.isEmpty(categoryId)) {
product.put("ddCid", "");
product.put("ddCategoryList", "");
} else {
List<Long> categoryIds = this.getCategoryIdList(categoryId);
Category category = categoryService.selectByPrimaryKey(categoryId);
String path = category.getPath();
List<String> pathArr = Arrays.asList(path.split(">"));
product.put("ddCid", categoryIds);
product.put("ddCategoryList", pathArr);
}
} else {
ProductGetRecommendCategoryRequest request = new ProductGetRecommendCategoryRequest();
ProductGetRecommendCategoryParam param = request.getParam();
param.setScene("category_infer");
param.setName(product.getString("title"));
ProductGetRecommendCategoryResponse response = request.execute();
List<CategoryDetailsItem> categoryDetails = response.getData().getCategoryDetails();
if (!ObjectUtil.isEmpty(categoryDetails) && categoryDetails.size() > 0) {
log.info(categoryDetails.get(0).toString());
CategoryDetailsItem categoryDetailsItem = categoryDetails.get(0);
List<Long> categoryIds = new ArrayList<>();
List<String> pathArr = new ArrayList<>();
if (categoryDetailsItem.getCategoryDetail().getFirstCid() != 0) {
categoryIds.add(categoryDetailsItem.getCategoryDetail().getFirstCid());
}
if (categoryDetailsItem.getCategoryDetail().getSecondCid() != 0) {
categoryIds.add(categoryDetailsItem.getCategoryDetail().getSecondCid());
}
if (categoryDetailsItem.getCategoryDetail().getFirstCid() != 0) {
categoryIds.add(categoryDetailsItem.getCategoryDetail().getFirstCid());
}
if (categoryDetailsItem.getCategoryDetail().getFourthCid() != 0) {
categoryIds.add(categoryDetailsItem.getCategoryDetail().getFourthCid());
}
if (!categoryDetailsItem.getCategoryDetail().getFirstCname().isEmpty()) {
pathArr.add(categoryDetailsItem.getCategoryDetail().getFirstCname());
}
if (!categoryDetailsItem.getCategoryDetail().getSecondCname().isEmpty()) {
pathArr.add(categoryDetailsItem.getCategoryDetail().getSecondCname());
}
if (!categoryDetailsItem.getCategoryDetail().getThirdCname().isEmpty()) {
pathArr.add(categoryDetailsItem.getCategoryDetail().getThirdCname());
}
if (!categoryDetailsItem.getCategoryDetail().getFourthCname().isEmpty()) {
pathArr.add(categoryDetailsItem.getCategoryDetail().getFourthCname());
}
ShopGetShopCategoryRequest shopRequest = new ShopGetShopCategoryRequest();
ShopGetShopCategoryParam shopParam = shopRequest.getParam();
shopParam.setCid(categoryIds.get(categoryIds.size() - 2));
ShopGetShopCategoryResponse shopReponse = shopRequest.execute();
if (!ObjectUtil.isEmpty(shopReponse.getData())) {
product.put("ddCid", categoryIds);
product.put("ddCategoryList", pathArr);
} else {
product.put("ddCid", "");
product.put("ddCategoryList", "");
}
} else {
product.put("ddCid", "");
product.put("ddCategoryList", "");
}
}
// 处理价格
Double price = product.getDoubleValue("jd_price");
JSONObject skuMap = product.getJSONObject("skuMap");
Double consignPrice = 0D;
for (String key : skuMap.keySet()) {
JSONObject item = JSONObject.parseObject(skuMap.get(key).toString());
Double cPrice = item.getDoubleValue("consignPrice");
if (cPrice > consignPrice) {
consignPrice = cPrice;
}
}
if (Objects.isNull(consignPrice)) {
consignPrice = 0D;
}
MoveShopSetting moveShopSetting = moveShopSettingService.getDetailByShopId(shopId);
ModelMapper modelMapper = new ModelMapper();
modelMapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
MoveShopSettingBO condition = modelMapper.map(moveShopSetting, MoveShopSettingBO.class);
if (condition.getAliPriceType().equals(MoveConst.ALI_PRICE_TYPE_CONSIGN) && consignPrice > 0) {
price = consignPrice;
}
double mPrice = moveService.processConditionPrice(condition, price);
product.put("price", mPrice);
log.info(res);
return resObj.getJSONObject("productInfo");
} catch (Exception e) {
fetchFailIds.add(productIds.get(finalI));
log.error("获取商品信息失败,异常:", e);
}
return null;
}));
}
for (Future<JSONObject> future : futures) {
try {
JSONObject pInfo = future.get();
if(ObjectUtil.isNotEmpty(pInfo)){
productInfo.add(pInfo);
}
} catch (Throwable e) {
log.info("MoveToSecondTaskService get futures exception:", e);
}
}
Ret ret = CommonTool.successResult();
Map<String, Object> result = new HashMap<>();
result.put("productList", productInfo);
result.put("fetchFailIds", fetchFailIds);
ret.setData(result);
return ret;
}
private List<Long> getCategoryIdList(Long categoryId) {
List<Long> categoryIdList = new ArrayList<>();
while (categoryId != 0) {
Category category = categoryService.selectByPrimaryKey(categoryId);
categoryIdList.add(categoryId);
categoryId = Long.valueOf(category.getParentCategoryId());
}
Collections.reverse(categoryIdList);
return categoryIdList;
}
private static Map assocArrayToHash(AssocArray assocArray) {
HashMap hashMap = assocArray.toHashMap();
Map result = new HashMap();
for (Object key : hashMap.keySet()) {
if (hashMap.get(key) instanceof AssocArray) {
result.put(key.toString(), assocArrayToHash((AssocArray) hashMap.get(key)));
} else if (hashMap.get(key) instanceof byte[]) {
result.put(key.toString(), new String((byte[]) hashMap.get(key)));
} else {
result.put(key.toString(), hashMap.get(key));
}
}
log.info(result + "result");
return result;
}
private Long getBestCategoryId(String sourceCateListStr, Map assocMap, Long shopId) {
Double max = 0.00;
Long categoryId = null;
for (Object key : assocMap.keySet()) {
System.out.println("key:" + key + ", value:" + assocMap.get(key));
Category category = categoryShopService.getCategoryListByCategoryId(shopId, (Long.valueOf(key.toString())));
if (!ObjectUtil.isEmpty(category)) {
Double score = this.getJaroWinklerDistance(sourceCateListStr, category.getPath());
if (score > max) {
max = score;
categoryId = Long.valueOf(key.toString());
}
}
}
return categoryId;
}
private double getJaroWinklerDistance(String first, String second) {
return StringUtils.getJaroWinklerDistance(first, second);
}
@Override
public Object getTask() {
return null;
}
@Override
public Object processTask(Object params) {
return null;
}
@Override
public void clearTask(Object params) {
}
}

@ -1,8 +1,12 @@
package com.ms.api.task;
import com.ms.api.biz.MoveService;
import com.ms.api.common.Ret;
import com.ms.api.common.TaskBaseService;
import com.ms.api.consts.RedisKeyConst;
import com.ms.api.paas.RedisService;
import com.ms.api.service.QueueService;
import com.ms.api.tool.CommonTool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
@ -28,6 +32,9 @@ public class MoveUnlockTaskService extends TaskBaseService {
@Autowired
QueueService unlockTimeoutQueue;
@Autowired
RedisService redisService;
/**
*
*/
@ -63,6 +70,22 @@ public class MoveUnlockTaskService extends TaskBaseService {
super.runTask();
}
@Override
protected Ret lockTask() {
log.info("try lock moveUnlockTask");
boolean isLock = redisService.tryLock(RedisKeyConst.MOVE_UNLOCK_TASK_LOCK_KEY,Thread.currentThread().getName(),60 * 10);
if(!isLock){
return CommonTool.failResult("lock fail");
}
log.info("locked moveUnlockTask");
return CommonTool.successResult();
}
@Override
protected void unlockTask(Ret ret) {
redisService.unlock(RedisKeyConst.MOVE_UNLOCK_TASK_LOCK_KEY,Thread.currentThread().getName());
}
/**
*
*/

@ -1396,7 +1396,6 @@ public class ProcessMovePublishToPicTaskService extends TaskBaseService {
}
private Ret checkAndRetryMoveProductPublishQueue(MoveProductPublishToPicQueueBO queueMsg, Ret addProductRet) {
// TODO
Ret ret = CommonTool.failResult();
ret.setQueueMsg(queueMsg);
return ret;

@ -956,7 +956,6 @@ public class ProcessProductPublishTaskService extends TaskBaseService {
}
Collections.sort(percentages);
// TODO
// if (sourceValue > percentages.get(0) && sourceValue <= percentages.get(1)) {
// matchVids.put(0D, attValue.getValue());
// break;
@ -1075,7 +1074,6 @@ public class ProcessProductPublishTaskService extends TaskBaseService {
if (matchVids.size() >= attribute.getMultiSelectMax() || matchVids.size() >= 1) {
matchVids = new LinkedHashMap<>(CommonTool.sliceMap(matchVids, attribute.getMultiSelectMax().intValue()));
// TODO
//return finishMatch(matchVids, matchSourceAttrs);
}
for (OptionsItem attValue : options) {
@ -1120,7 +1118,6 @@ public class ProcessProductPublishTaskService extends TaskBaseService {
List<StrObjMap> matchAttrList = new ArrayList<>();
if (isReq && matchVids.isEmpty()) {
//TODO 这里没有字段
//matchVids.put((Integer) maxMatchAttr.get("attrValueID"), maxMatchAttr);
}
for (OptionsItem matchVid : matchVids.values()) {

Loading…
Cancel
Save