You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
202 lines
8.5 KiB
PHP
202 lines
8.5 KiB
PHP
<?php
|
|
|
|
class QueueService {
|
|
|
|
/**
|
|
*
|
|
* @var QueueDao
|
|
*/
|
|
private $queueDao;
|
|
|
|
private $db;
|
|
private $solidRedis;
|
|
|
|
public function __construct() {
|
|
$this->queueDao = Zc::singleton('QueueDao');
|
|
|
|
$this->db = Zc::getDb();
|
|
$this->solidRedis = RedisExt::factory('solidCache');
|
|
}
|
|
|
|
public function getQueueMsgList($queueName, $shopId) {
|
|
return $this->queueDao->getQueueMsgList($queueName, $shopId);
|
|
}
|
|
|
|
public function getQueueMsg($queueName, $queueId) {
|
|
return $this->queueDao->getQueueMsg($queueName, $queueId);
|
|
}
|
|
|
|
public function addQueueMsg($queueName, $shopId, $app, $accessToken) {
|
|
return $this->queueDao->addQueueMsg($queueName, $shopId, $app, $accessToken);
|
|
}
|
|
|
|
public function lockQueueMsg($timerLockId, $filter) {
|
|
return $this->queueDao->lockQueueMsg($timerLockId, $filter);
|
|
}
|
|
|
|
public function unlockTimeoutQueue($queueName, $expiredSeconds = 300, $status = null) {
|
|
return $this->queueDao->unlockTimeoutQueue($queueName, $expiredSeconds, $status);
|
|
}
|
|
|
|
public function unlockTimeoutSolidRedisQueue($queueName, $heartbeatExpiredSeconds = 300, $status = null) {
|
|
$c1 = $this->queueDao->unlockSolidRedisDeadQueue($queueName, $heartbeatExpiredSeconds, $status);
|
|
$c2 = $this->queueDao->unlockSolidRedisSkipQueue($queueName, $status);
|
|
return $c1 + $c2;
|
|
}
|
|
|
|
public function deleteQueueMsg($queueName, $row) {
|
|
return $this->queueDao->deleteQueueMsg($queueName, $row);
|
|
}
|
|
|
|
public function lockPriorityQueue($timerLockId, $queueName, $filter) {
|
|
return $this->queueDao->lockPriorityQueue($timerLockId, $queueName, $filter);
|
|
}
|
|
|
|
public function lockFifoQueue($timerLockId, $queueName, $filter) {
|
|
return $this->queueDao->lockFifoQueue($timerLockId, $queueName, $filter);
|
|
}
|
|
|
|
public function lockVenderAvgQueue($timerLockId, $queueName, $filter) {
|
|
return $this->queueDao->lockVenderAvgQueue($timerLockId, $queueName, $filter);
|
|
}
|
|
|
|
public function lockSolidRedisQueue($timerLockId, $queueName, $isAdminSelf) {
|
|
return $this->queueDao->lockSolidRedisQueue($timerLockId, $queueName, $isAdminSelf);
|
|
}
|
|
|
|
public function batchLockSolidRedisQueue($timerLockId, $queueName, $isAdminSelf, $lockCount) {
|
|
return $this->queueDao->batchLockSolidRedisQueue($timerLockId, $queueName, $isAdminSelf, $lockCount);
|
|
}
|
|
|
|
public function lockSolidRedisQueueByBufferId($timerLockId, $queueName, $isAdminSelf, $bufferPrimaryColumn) {
|
|
return $this->queueDao->lockSolidRedisQueueByBufferId($timerLockId, $queueName, $isAdminSelf, $bufferPrimaryColumn);
|
|
}
|
|
|
|
public function allocBufferToQueueByShopAvg($bufferName, $queueName, $filter, $maxQueueCount = 300, $shopLimit = null) {
|
|
return $this->queueDao->allocBufferToQueueByShopAvg($bufferName, $queueName, $filter, $maxQueueCount, $shopLimit);
|
|
}
|
|
|
|
public function moveBufferToSolidRedisQueue($bufferTableName, $queueTableName, $filter, $maxQueueCount = 2000) {
|
|
list($realDbId, $bufferTbl) = DbRoute::getDbAndTbl($bufferTableName);
|
|
list($realDbId, $queueTbl) = DbRoute::getDbAndTbl($queueTableName);
|
|
|
|
$wheres = [];
|
|
if ($filter['gmtExec']) {
|
|
$wheres[] = $this->db->prepare('and (gmt_exec is null or gmt_exec <= %s)', $filter['gmtExec']);
|
|
}
|
|
if ($filter['includeShopIds']) {
|
|
$wheres[] = $this->db->prepare('AND shop_id IN %li', $filter['includeShopIds']);
|
|
}
|
|
|
|
$queueCount = $this->db->useDbIdOnce($realDbId)->queryFirstField('select count(*) from %b', $queueTbl);
|
|
if ($queueCount > $maxQueueCount) {
|
|
return false;
|
|
}
|
|
|
|
$needMoveQueueCount = $maxQueueCount - $queueCount;
|
|
$limit = 100;
|
|
$stopFilePath = Zc::C(ZcConfigConst::LogDir) . "stop_do_move_{$bufferTableName}_to_queue";
|
|
$bufferPrimaryField = QueueConst::getQueueIdColumnName($bufferTbl);
|
|
$startBufferId = 0;
|
|
while (true) {
|
|
if (file_exists($stopFilePath)) {
|
|
break;
|
|
}
|
|
if ($needMoveQueueCount <= 0) {
|
|
break;
|
|
}
|
|
|
|
$loopWheres = $wheres;
|
|
$loopWheres[] = $this->db->prepare('AND %l > %i', $bufferPrimaryField, $startBufferId);
|
|
$whereStr = implode(' ', $loopWheres);
|
|
|
|
$loopLimit = $needMoveQueueCount > $limit ? $limit : $needMoveQueueCount;
|
|
$bufferList = $this->db->useDbIdOnce($realDbId)->query('select * from %b where 1=1 %l order by %l asc limit %i', $bufferTbl, $whereStr, $bufferPrimaryField, $loopLimit);
|
|
if (empty($bufferList)) {
|
|
break;
|
|
}
|
|
$deleteBufferIds = [];
|
|
$queueDatas = [];
|
|
foreach ($bufferList as $bufferData) {
|
|
$startBufferId = $bufferData[$bufferPrimaryField];
|
|
if (!$filter['includeShopIds'] && CommonTool::isCurrAdminSelfShop($bufferData['shop_id'])) {
|
|
continue;
|
|
}
|
|
|
|
$deleteBufferIds[] = $bufferData[$bufferPrimaryField];
|
|
|
|
$tmpBufferData = $bufferData;
|
|
unset($tmpBufferData['gmt_exec']);
|
|
unset($tmpBufferData[$bufferPrimaryField]);
|
|
$tmpBufferData['locked'] = 0;
|
|
$tmpBufferData['gmt_create'] = ZcDbEval::now();
|
|
$tmpBufferData['gmt_modified'] = ZcDbEval::now();
|
|
$queueDatas[] = $tmpBufferData;
|
|
}
|
|
|
|
$oldErrorMode = $this->db->setErrorMode(ZcDb::ERROR_MODE_EXCEPTION);
|
|
$transStatus = $this->db->useDbIdOnce($realDbId)->startTransaction();
|
|
try {
|
|
if ($queueDatas) {
|
|
$this->db->useDbIdOnce($realDbId)->insert($queueTbl, $queueDatas);
|
|
$startQueueId = $this->db->lastInsertId();
|
|
}
|
|
|
|
if ($deleteBufferIds) {
|
|
$this->db->useDbIdOnce($realDbId)->delete($bufferTbl, '%l IN %li', $bufferPrimaryField, $deleteBufferIds);
|
|
}
|
|
|
|
$this->db->useDbIdOnce($realDbId)->commit($transStatus);
|
|
$this->db->setErrorMode($oldErrorMode);
|
|
} catch (Exception $e) {
|
|
$this->db->useDbIdOnce($realDbId)->rollback($transStatus);
|
|
$this->db->setErrorMode($oldErrorMode);
|
|
break;
|
|
}
|
|
|
|
$selfQueueIds = $notSelfQueueIds = [];
|
|
if (!empty($startQueueId)) {
|
|
$queueCount = count($queueDatas);
|
|
$queueIdLoop = 0;
|
|
for ($queueId = $startQueueId; $queueId < ($queueCount + $startQueueId); $queueId ++) {
|
|
$loopQueueData = $queueDatas[$queueIdLoop];
|
|
|
|
if (CommonTool::isCurrAdminSelfShop($loopQueueData['shop_id'])) {
|
|
$selfQueueIds[] = $queueId;
|
|
} else {
|
|
$notSelfQueueIds[] = $queueId;
|
|
}
|
|
$queueIdLoop ++;
|
|
}
|
|
}
|
|
Zc::getLog('timer/common/move_buffer_to_queue/move_buffer')->info("loop startBufferId[{$startBufferId}]");
|
|
if ($selfQueueIds) {
|
|
$queueRedisKey = RedisKeyConst::getQueueRedisKey($queueTbl, true);
|
|
$redisRet = $this->solidRedis->lPush($queueRedisKey, ...$selfQueueIds);
|
|
if (false === $redisRet) {
|
|
/** @var Exception $ex */
|
|
$ex = $this->solidRedis->lastException();
|
|
$reason = $ex ? $ex->getMessage() : "emptyEx";
|
|
Zc::getLog('timer/common/move_buffer_to_queue/move_buffer')->info("addQueueIdsToSolidRedisQueue[$queueRedisKey] -> reason : " . $reason);
|
|
}
|
|
}
|
|
if ($notSelfQueueIds) {
|
|
$queueRedisKey = RedisKeyConst::getQueueRedisKey($queueTbl, false);
|
|
$redisRet = $this->solidRedis->lPush($queueRedisKey, ...$notSelfQueueIds);
|
|
if (false === $redisRet) {
|
|
/** @var Exception $ex */
|
|
$ex = $this->solidRedis->lastException();
|
|
$reason = $ex ? $ex->getMessage() : "emptyEx";
|
|
Zc::getLog('timer/common/move_buffer_to_queue/move_buffer')->info("addQueueIdsToSolidRedisQueue[$queueRedisKey] -> reason : " . $reason);
|
|
}
|
|
}
|
|
|
|
$needMoveQueueCount -= count($deleteBufferIds);
|
|
if ($needMoveQueueCount <= 0 || count($bufferList) < $limit) {
|
|
break;
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
}
|