diff --git a/doc/move/service/class.QueueService.php b/doc/move/service/class.QueueService.php new file mode 100644 index 00000000..269d2f2b --- /dev/null +++ b/doc/move/service/class.QueueService.php @@ -0,0 +1,201 @@ +queueDao = Zc::singleton('QueueDao'); + + $this->db = Zc::getDb(); + $this->solidRedis = RedisExt::factory('solidCache'); + } + + public function getQueueMsgList($queueName, $shopId) { + return $this->queueDao->getQueueMsgList($queueName, $shopId); + } + + public function getQueueMsg($queueName, $queueId) { + return $this->queueDao->getQueueMsg($queueName, $queueId); + } + + public function addQueueMsg($queueName, $shopId, $app, $accessToken) { + return $this->queueDao->addQueueMsg($queueName, $shopId, $app, $accessToken); + } + + public function lockQueueMsg($timerLockId, $filter) { + return $this->queueDao->lockQueueMsg($timerLockId, $filter); + } + + public function unlockTimeoutQueue($queueName, $expiredSeconds = 300, $status = null) { + return $this->queueDao->unlockTimeoutQueue($queueName, $expiredSeconds, $status); + } + + public function unlockTimeoutSolidRedisQueue($queueName, $heartbeatExpiredSeconds = 300, $status = null) { + $c1 = $this->queueDao->unlockSolidRedisDeadQueue($queueName, $heartbeatExpiredSeconds, $status); + $c2 = $this->queueDao->unlockSolidRedisSkipQueue($queueName, $status); + return $c1 + $c2; + } + + public function deleteQueueMsg($queueName, $row) { + return $this->queueDao->deleteQueueMsg($queueName, $row); + } + + public function lockPriorityQueue($timerLockId, $queueName, $filter) { + return $this->queueDao->lockPriorityQueue($timerLockId, $queueName, $filter); + } + + public function lockFifoQueue($timerLockId, $queueName, $filter) { + return $this->queueDao->lockFifoQueue($timerLockId, $queueName, $filter); + } + + public function lockVenderAvgQueue($timerLockId, $queueName, $filter) { + return $this->queueDao->lockVenderAvgQueue($timerLockId, $queueName, $filter); + } + + public function lockSolidRedisQueue($timerLockId, $queueName, $isAdminSelf) { + return $this->queueDao->lockSolidRedisQueue($timerLockId, $queueName, $isAdminSelf); + } + + public function batchLockSolidRedisQueue($timerLockId, $queueName, $isAdminSelf, $lockCount) { + return $this->queueDao->batchLockSolidRedisQueue($timerLockId, $queueName, $isAdminSelf, $lockCount); + } + + public function lockSolidRedisQueueByBufferId($timerLockId, $queueName, $isAdminSelf, $bufferPrimaryColumn) { + return $this->queueDao->lockSolidRedisQueueByBufferId($timerLockId, $queueName, $isAdminSelf, $bufferPrimaryColumn); + } + + public function allocBufferToQueueByShopAvg($bufferName, $queueName, $filter, $maxQueueCount = 300, $shopLimit = null) { + return $this->queueDao->allocBufferToQueueByShopAvg($bufferName, $queueName, $filter, $maxQueueCount, $shopLimit); + } + + public function moveBufferToSolidRedisQueue($bufferTableName, $queueTableName, $filter, $maxQueueCount = 2000) { + list($realDbId, $bufferTbl) = DbRoute::getDbAndTbl($bufferTableName); + list($realDbId, $queueTbl) = DbRoute::getDbAndTbl($queueTableName); + + $wheres = []; + if ($filter['gmtExec']) { + $wheres[] = $this->db->prepare('and (gmt_exec is null or gmt_exec <= %s)', $filter['gmtExec']); + } + if ($filter['includeShopIds']) { + $wheres[] = $this->db->prepare('AND shop_id IN %li', $filter['includeShopIds']); + } + + $queueCount = $this->db->useDbIdOnce($realDbId)->queryFirstField('select count(*) from %b', $queueTbl); + if ($queueCount > $maxQueueCount) { + return false; + } + + $needMoveQueueCount = $maxQueueCount - $queueCount; + $limit = 100; + $stopFilePath = Zc::C(ZcConfigConst::LogDir) . "stop_do_move_{$bufferTableName}_to_queue"; + $bufferPrimaryField = QueueConst::getQueueIdColumnName($bufferTbl); + $startBufferId = 0; + while (true) { + if (file_exists($stopFilePath)) { + break; + } + if ($needMoveQueueCount <= 0) { + break; + } + + $loopWheres = $wheres; + $loopWheres[] = $this->db->prepare('AND %l > %i', $bufferPrimaryField, $startBufferId); + $whereStr = implode(' ', $loopWheres); + + $loopLimit = $needMoveQueueCount > $limit ? $limit : $needMoveQueueCount; + $bufferList = $this->db->useDbIdOnce($realDbId)->query('select * from %b where 1=1 %l order by %l asc limit %i', $bufferTbl, $whereStr, $bufferPrimaryField, $loopLimit); + if (empty($bufferList)) { + break; + } + $deleteBufferIds = []; + $queueDatas = []; + foreach ($bufferList as $bufferData) { + $startBufferId = $bufferData[$bufferPrimaryField]; + if (!$filter['includeShopIds'] && CommonTool::isCurrAdminSelfShop($bufferData['shop_id'])) { + continue; + } + + $deleteBufferIds[] = $bufferData[$bufferPrimaryField]; + + $tmpBufferData = $bufferData; + unset($tmpBufferData['gmt_exec']); + unset($tmpBufferData[$bufferPrimaryField]); + $tmpBufferData['locked'] = 0; + $tmpBufferData['gmt_create'] = ZcDbEval::now(); + $tmpBufferData['gmt_modified'] = ZcDbEval::now(); + $queueDatas[] = $tmpBufferData; + } + + $oldErrorMode = $this->db->setErrorMode(ZcDb::ERROR_MODE_EXCEPTION); + $transStatus = $this->db->useDbIdOnce($realDbId)->startTransaction(); + try { + if ($queueDatas) { + $this->db->useDbIdOnce($realDbId)->insert($queueTbl, $queueDatas); + $startQueueId = $this->db->lastInsertId(); + } + + if ($deleteBufferIds) { + $this->db->useDbIdOnce($realDbId)->delete($bufferTbl, '%l IN %li', $bufferPrimaryField, $deleteBufferIds); + } + + $this->db->useDbIdOnce($realDbId)->commit($transStatus); + $this->db->setErrorMode($oldErrorMode); + } catch (Exception $e) { + $this->db->useDbIdOnce($realDbId)->rollback($transStatus); + $this->db->setErrorMode($oldErrorMode); + break; + } + + $selfQueueIds = $notSelfQueueIds = []; + if (!empty($startQueueId)) { + $queueCount = count($queueDatas); + $queueIdLoop = 0; + for ($queueId = $startQueueId; $queueId < ($queueCount + $startQueueId); $queueId ++) { + $loopQueueData = $queueDatas[$queueIdLoop]; + + if (CommonTool::isCurrAdminSelfShop($loopQueueData['shop_id'])) { + $selfQueueIds[] = $queueId; + } else { + $notSelfQueueIds[] = $queueId; + } + $queueIdLoop ++; + } + } + Zc::getLog('timer/common/move_buffer_to_queue/move_buffer')->info("loop startBufferId[{$startBufferId}]"); + if ($selfQueueIds) { + $queueRedisKey = RedisKeyConst::getQueueRedisKey($queueTbl, true); + $redisRet = $this->solidRedis->lPush($queueRedisKey, ...$selfQueueIds); + if (false === $redisRet) { + /** @var Exception $ex */ + $ex = $this->solidRedis->lastException(); + $reason = $ex ? $ex->getMessage() : "emptyEx"; + Zc::getLog('timer/common/move_buffer_to_queue/move_buffer')->info("addQueueIdsToSolidRedisQueue[$queueRedisKey] -> reason : " . $reason); + } + } + if ($notSelfQueueIds) { + $queueRedisKey = RedisKeyConst::getQueueRedisKey($queueTbl, false); + $redisRet = $this->solidRedis->lPush($queueRedisKey, ...$notSelfQueueIds); + if (false === $redisRet) { + /** @var Exception $ex */ + $ex = $this->solidRedis->lastException(); + $reason = $ex ? $ex->getMessage() : "emptyEx"; + Zc::getLog('timer/common/move_buffer_to_queue/move_buffer')->info("addQueueIdsToSolidRedisQueue[$queueRedisKey] -> reason : " . $reason); + } + } + + $needMoveQueueCount -= count($deleteBufferIds); + if ($needMoveQueueCount <= 0 || count($bufferList) < $limit) { + break; + } + } + return true; + } +}