传源码

20230922-ljl-fixBug
wangchaoxu 1 year ago
parent 2f3cf72b2b
commit e329d15bd4

@ -0,0 +1,201 @@
<?php
class QueueService {
/**
*
* @var QueueDao
*/
private $queueDao;
private $db;
private $solidRedis;
public function __construct() {
$this->queueDao = Zc::singleton('QueueDao');
$this->db = Zc::getDb();
$this->solidRedis = RedisExt::factory('solidCache');
}
public function getQueueMsgList($queueName, $shopId) {
return $this->queueDao->getQueueMsgList($queueName, $shopId);
}
public function getQueueMsg($queueName, $queueId) {
return $this->queueDao->getQueueMsg($queueName, $queueId);
}
public function addQueueMsg($queueName, $shopId, $app, $accessToken) {
return $this->queueDao->addQueueMsg($queueName, $shopId, $app, $accessToken);
}
public function lockQueueMsg($timerLockId, $filter) {
return $this->queueDao->lockQueueMsg($timerLockId, $filter);
}
public function unlockTimeoutQueue($queueName, $expiredSeconds = 300, $status = null) {
return $this->queueDao->unlockTimeoutQueue($queueName, $expiredSeconds, $status);
}
public function unlockTimeoutSolidRedisQueue($queueName, $heartbeatExpiredSeconds = 300, $status = null) {
$c1 = $this->queueDao->unlockSolidRedisDeadQueue($queueName, $heartbeatExpiredSeconds, $status);
$c2 = $this->queueDao->unlockSolidRedisSkipQueue($queueName, $status);
return $c1 + $c2;
}
public function deleteQueueMsg($queueName, $row) {
return $this->queueDao->deleteQueueMsg($queueName, $row);
}
public function lockPriorityQueue($timerLockId, $queueName, $filter) {
return $this->queueDao->lockPriorityQueue($timerLockId, $queueName, $filter);
}
public function lockFifoQueue($timerLockId, $queueName, $filter) {
return $this->queueDao->lockFifoQueue($timerLockId, $queueName, $filter);
}
public function lockVenderAvgQueue($timerLockId, $queueName, $filter) {
return $this->queueDao->lockVenderAvgQueue($timerLockId, $queueName, $filter);
}
public function lockSolidRedisQueue($timerLockId, $queueName, $isAdminSelf) {
return $this->queueDao->lockSolidRedisQueue($timerLockId, $queueName, $isAdminSelf);
}
public function batchLockSolidRedisQueue($timerLockId, $queueName, $isAdminSelf, $lockCount) {
return $this->queueDao->batchLockSolidRedisQueue($timerLockId, $queueName, $isAdminSelf, $lockCount);
}
public function lockSolidRedisQueueByBufferId($timerLockId, $queueName, $isAdminSelf, $bufferPrimaryColumn) {
return $this->queueDao->lockSolidRedisQueueByBufferId($timerLockId, $queueName, $isAdminSelf, $bufferPrimaryColumn);
}
public function allocBufferToQueueByShopAvg($bufferName, $queueName, $filter, $maxQueueCount = 300, $shopLimit = null) {
return $this->queueDao->allocBufferToQueueByShopAvg($bufferName, $queueName, $filter, $maxQueueCount, $shopLimit);
}
public function moveBufferToSolidRedisQueue($bufferTableName, $queueTableName, $filter, $maxQueueCount = 2000) {
list($realDbId, $bufferTbl) = DbRoute::getDbAndTbl($bufferTableName);
list($realDbId, $queueTbl) = DbRoute::getDbAndTbl($queueTableName);
$wheres = [];
if ($filter['gmtExec']) {
$wheres[] = $this->db->prepare('and (gmt_exec is null or gmt_exec <= %s)', $filter['gmtExec']);
}
if ($filter['includeShopIds']) {
$wheres[] = $this->db->prepare('AND shop_id IN %li', $filter['includeShopIds']);
}
$queueCount = $this->db->useDbIdOnce($realDbId)->queryFirstField('select count(*) from %b', $queueTbl);
if ($queueCount > $maxQueueCount) {
return false;
}
$needMoveQueueCount = $maxQueueCount - $queueCount;
$limit = 100;
$stopFilePath = Zc::C(ZcConfigConst::LogDir) . "stop_do_move_{$bufferTableName}_to_queue";
$bufferPrimaryField = QueueConst::getQueueIdColumnName($bufferTbl);
$startBufferId = 0;
while (true) {
if (file_exists($stopFilePath)) {
break;
}
if ($needMoveQueueCount <= 0) {
break;
}
$loopWheres = $wheres;
$loopWheres[] = $this->db->prepare('AND %l > %i', $bufferPrimaryField, $startBufferId);
$whereStr = implode(' ', $loopWheres);
$loopLimit = $needMoveQueueCount > $limit ? $limit : $needMoveQueueCount;
$bufferList = $this->db->useDbIdOnce($realDbId)->query('select * from %b where 1=1 %l order by %l asc limit %i', $bufferTbl, $whereStr, $bufferPrimaryField, $loopLimit);
if (empty($bufferList)) {
break;
}
$deleteBufferIds = [];
$queueDatas = [];
foreach ($bufferList as $bufferData) {
$startBufferId = $bufferData[$bufferPrimaryField];
if (!$filter['includeShopIds'] && CommonTool::isCurrAdminSelfShop($bufferData['shop_id'])) {
continue;
}
$deleteBufferIds[] = $bufferData[$bufferPrimaryField];
$tmpBufferData = $bufferData;
unset($tmpBufferData['gmt_exec']);
unset($tmpBufferData[$bufferPrimaryField]);
$tmpBufferData['locked'] = 0;
$tmpBufferData['gmt_create'] = ZcDbEval::now();
$tmpBufferData['gmt_modified'] = ZcDbEval::now();
$queueDatas[] = $tmpBufferData;
}
$oldErrorMode = $this->db->setErrorMode(ZcDb::ERROR_MODE_EXCEPTION);
$transStatus = $this->db->useDbIdOnce($realDbId)->startTransaction();
try {
if ($queueDatas) {
$this->db->useDbIdOnce($realDbId)->insert($queueTbl, $queueDatas);
$startQueueId = $this->db->lastInsertId();
}
if ($deleteBufferIds) {
$this->db->useDbIdOnce($realDbId)->delete($bufferTbl, '%l IN %li', $bufferPrimaryField, $deleteBufferIds);
}
$this->db->useDbIdOnce($realDbId)->commit($transStatus);
$this->db->setErrorMode($oldErrorMode);
} catch (Exception $e) {
$this->db->useDbIdOnce($realDbId)->rollback($transStatus);
$this->db->setErrorMode($oldErrorMode);
break;
}
$selfQueueIds = $notSelfQueueIds = [];
if (!empty($startQueueId)) {
$queueCount = count($queueDatas);
$queueIdLoop = 0;
for ($queueId = $startQueueId; $queueId < ($queueCount + $startQueueId); $queueId ++) {
$loopQueueData = $queueDatas[$queueIdLoop];
if (CommonTool::isCurrAdminSelfShop($loopQueueData['shop_id'])) {
$selfQueueIds[] = $queueId;
} else {
$notSelfQueueIds[] = $queueId;
}
$queueIdLoop ++;
}
}
Zc::getLog('timer/common/move_buffer_to_queue/move_buffer')->info("loop startBufferId[{$startBufferId}]");
if ($selfQueueIds) {
$queueRedisKey = RedisKeyConst::getQueueRedisKey($queueTbl, true);
$redisRet = $this->solidRedis->lPush($queueRedisKey, ...$selfQueueIds);
if (false === $redisRet) {
/** @var Exception $ex */
$ex = $this->solidRedis->lastException();
$reason = $ex ? $ex->getMessage() : "emptyEx";
Zc::getLog('timer/common/move_buffer_to_queue/move_buffer')->info("addQueueIdsToSolidRedisQueue[$queueRedisKey] -> reason : " . $reason);
}
}
if ($notSelfQueueIds) {
$queueRedisKey = RedisKeyConst::getQueueRedisKey($queueTbl, false);
$redisRet = $this->solidRedis->lPush($queueRedisKey, ...$notSelfQueueIds);
if (false === $redisRet) {
/** @var Exception $ex */
$ex = $this->solidRedis->lastException();
$reason = $ex ? $ex->getMessage() : "emptyEx";
Zc::getLog('timer/common/move_buffer_to_queue/move_buffer')->info("addQueueIdsToSolidRedisQueue[$queueRedisKey] -> reason : " . $reason);
}
}
$needMoveQueueCount -= count($deleteBufferIds);
if ($needMoveQueueCount <= 0 || count($bufferList) < $limit) {
break;
}
}
return true;
}
}
Loading…
Cancel
Save