You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
honor-dd-light-ds-java/doc/move/dao/class.QueueDao.php

650 lines
26 KiB
PHP

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

<?php
class QueueDao {
private $db;
/**
*
* @var TimerDao
*/
private $timerDao;
private $reviveQueueLog;
private $solidRedis;
public function __construct() {
$this->db = Zc::getDb();
$this->timerDao = Zc::singleton("TimerDao");
$this->reviveQueueLog = Zc::getLog('timer/check/revive_queue');
$this->solidRedis = RedisExt::factory('solidCache');
}
public function addQueueMsg($queueName, $shopId, $app, $accessToken, $priority = 100) {
$ret = $this->db->insert($queueName, array (
'shop_id' => $shopId,
'app' => $app,
// 'access_token' => $accessToken, //为了安全起见,不再存这个段,在同步的时候实时取
'priority' => $priority,
'locked' => 0,
'gmt_create' => ZcDbEval::now(),
'gmt_modified' => ZcDbEval::now()
));
return ($ret === false) ? false : $this->db->lastInsertId();
}
public function getQueueMsg($queueName, $queueId) {
return $this->db->queryFirstRow('select * from %b where %b = %i', $queueName, QueueConst::getQueueIdColumnName($queueName), $queueId);
}
public function unlockTimeoutQueue($queueName, $expiredSeconds = 300, $status = null) {
$deadTime = date('Y-m-d H:i:s', strtotime("-$expiredSeconds second"));
list($realDb, $realTbl) = DbRoute::getDbAndTbl($queueName);
$where = $this->db->prepare('WHERE locked > 0 AND gmt_last_heartbeat < %s', $deadTime);
if ($status) {
$where .= ' ' . $this->db->prepare('and `status` = %s', $status);
}
$queueList = $this->db->useDbIdOnce($realDb)->query('SELECT * FROM %b %l', $queueName, $where);
if (empty($queueList)) {
return 0;
}
list($realTimerDb, $realTimerLockTbl) = DbRoute::getDbAndTbl('timer_lock');
$queueIdColumnName = QueueConst::getQueueIdColumnName($queueName);
foreach ($queueList as $queue) {
$ar1 = $this->db->useDbIdOnce($realDb)->update($realTbl, array(
'locked' => 0,
'gmt_modified' => ZcDbEval::now()
), '%b = %i', $queueIdColumnName, $queue[$queueIdColumnName]);
$ar2 = $this->timerDao->deleteTimerLock($queue['locked'], $realTimerDb, '', $queue['hostname']);
$this->reviveQueueLog->info('unlockQueue queue ' . ZcArrayHelper::sp($queue) . ", unlockedQueue $ar1, deleteTimerLock: $ar2");
}
return count($queueList);
}
public function unlockSolidRedisDeadQueue($queueName, $expiredSeconds = 300, $status = null) {
$deadTime = date('Y-m-d H:i:s', strtotime("-$expiredSeconds second"));
list($realDb, $realTbl) = DbRoute::getDbAndTbl($queueName);
$where = $this->db->prepare('WHERE locked > 0 AND gmt_last_heartbeat < %s', $deadTime);
if ($status) {
$where .= ' ' . $this->db->prepare('and `status` = %s', $status);
}
$queueList = $this->db->useDbIdOnce($realDb)->query('SELECT * FROM %b %l', $realTbl, $where);
if (empty($queueList)) {
return 0;
}
list($realTimerDb, $realTimerLockTbl) = DbRoute::getDbAndTbl('timer_lock');
$queueIdColumnName = QueueConst::getQueueIdColumnName($queueName);
foreach ($queueList as $queue) {
$ar1 = $this->db->useDbIdOnce($realDb)->update($realTbl, array(
'locked' => 0,
'gmt_modified' => ZcDbEval::now()
), '%b = %i', $queueIdColumnName, $queue[$queueIdColumnName]);
$queueId = $queue[$queueIdColumnName];
if ($queueId) {
$queueRedisKey = RedisKeyConst::getQueueRedisKey($queueName, CommonTool::isCurrAdminSelfShop($queue['shop_id']));
$this->solidRedis->lPush($queueRedisKey, $queueId);
}
$ar2 = $this->timerDao->deleteTimerLock($queue['locked'], $realTimerDb, '', $queue['hostname']);
$this->reviveQueueLog->info('unlockQueue queue ' . ZcArrayHelper::sp($queue) . ", unlockedQueue $ar1, deleteTimerLock: $ar2");
}
return count($queueList);
}
public function unlockSolidRedisSkipQueue($queueName, $status = null) {
$queueRedisKeySelf = RedisKeyConst::getQueueRedisKey($queueName, true);
$queueRedisKey = RedisKeyConst::getQueueRedisKey($queueName, false);
$selfRedisIds = $this->solidRedis->lrange($queueRedisKeySelf, 0, -1) ?: [];
$redisIds = $this->solidRedis->lrange($queueRedisKey, 0, -1) ?: [];
list($realDb, $realTbl) = DbRoute::getDbAndTbl($queueName);
$where = '';
if ($status) {
$where = $this->db->prepare('AND `status` = %s', $status);
}
$queueIdColumnName = QueueConst::getQueueIdColumnName($queueName);
$startQueueId = 0;
$limit = 100;
$allQueueList = array();
while (true) {
if ($startQueueId > 0) {
$queueIdWhere = $this->db->prepare('AND %b > %i', $queueIdColumnName, $startQueueId);
}
$queueList = $this->db->useDbIdOnce($realDb)->query('SELECT * FROM %b WHERE locked = 0 %l %l order by %b asc limit %i', $realTbl, $where, $queueIdWhere, $queueIdColumnName, $limit);
if (empty($queueList)) {
break;
}
$startQueueId = end($queueList)[$queueIdColumnName];
$allQueueList = array_merge($allQueueList, $queueList);
if (count($queueList) < $limit) {
break;
}
}
if (empty($allQueueList)) {
return 0;
}
$newSelfRedisIds = $this->solidRedis->lrange($queueRedisKeySelf, 0, -1) ?: [];
$newRedisIds = $this->solidRedis->lrange($queueRedisKey, 0, -1) ?: [];
$selfRedisIds = array_unique(array_merge($selfRedisIds, $newSelfRedisIds));
$redisIds = array_unique(array_merge($redisIds, $newRedisIds));
foreach ($allQueueList as $queue) {
$queueId = $queue[$queueIdColumnName];
if ($queueId) {
if (in_array($queueId, $selfRedisIds) || in_array($queueId, $redisIds)) {
continue;
}
if (in_array($queue['shop_id'], CommonTool::adminSelfShopIds())) {
$this->solidRedis->lPush($queueRedisKeySelf, $queueId);
} else {
$this->solidRedis->lPush($queueRedisKey, $queueId);
}
}
}
return count($queueList);
}
public function getQueueMsgList($queueName, $shopId) {
return $this->db->query('select * from %b where shop_id = %i order by gmt_modified asc', $queueName, $shopId);
}
public function updateQueuePriority($queueName, $queueId, $priority) {
$ret = $this->db->update($queueName, array('priority' => $priority), '%b = %i', QueueConst::getQueueIdColumnName($queueName), $queueId);
return $ret;
}
public function insertUpdateShopQueue($queueName, $shopId, $priority = 100) {
list($realDb, $realTbl) = DbRoute::getDbAndTbl($queueName);
$queueInfo = $this->db->useDbIdOnce($realDb)->queryFirstRow("select * FROM %b WHERE shop_id = %i", $realTbl, $shopId);
if (!$queueInfo) {
$insert = array (
'shop_id' => $shopId,
'locked' => 0,
'priority' => $priority,
'gmt_create' => ZcDbEval::now(),
'gmt_modified' => ZcDbEval::now()
);
return $this->db->useDbIdOnce($realDb)->insert($realTbl, $insert);
}
if (($queueInfo['locked'] > 0) || ($queueInfo['priority'] <= $priority)) {
return true;
}
$update = array(
'priority' => $priority,
'gmt_modified' => ZcDbEval::now()
);
return $this->db->useDbIdOnce($realDb)->update($realTbl, $update, 'shop_id = %i', $shopId);
}
public function deleteShopQueue($queueName, $shopId) {
list($realDb, $realTbl) = DbRoute::getDbAndTbl($queueName);
return $this->db->useDbIdOnce($realDb)->delete($realTbl, 'shop_id = %i',$shopId);
}
public function lockQueueMsg($timerLockId, $filter) {
$queueName = $filter['queueName'];
$where = '';
if ($queueName != QueueConst::titleKeywordRsyncQueue) {
if ($filter['includeShopIds']) {
$where = $this->db->prepare('and shop_id in %li', $filter['includeShopIds']);
}
if ($filter['excludeShopIds']) {
$where = $this->db->prepare('and shop_id not in %li', $filter['excludeShopIds']);
}
}
$row = $this->db->queryFirstRow("select * from %b where `locked` = 0 %l order by priority asc, gmt_modified asc", $queueName, $where);
if (empty($row)) {
return $row;
}
// 乐观锁
$primaryColumn = QueueConst::getQueueIdColumnName($queueName);
$queueId = $row[$primaryColumn];
$affectRows = $this->db->update($queueName, array (
'locked' => $timerLockId,
'gmt_last_heartbeat' => ZcDbEval::now(),
'gmt_locked' => ZcDbEval::now(),
'gmt_modified' => ZcDbEval::now()
), "%b = %i and locked = 0", $primaryColumn, $queueId);
if ($affectRows == 1) {
$row['locked'] = $timerLockId;
$this->timerDao->addHeartbeat($timerLockId, $queueId, $queueName);
return $row;
}
return array ();
}
public function lockQueueRandom($timerLockId, $queueName, $filter) {
$batchTypeWhere = '';
$where = array();
if (!empty($filter['includeShopIds'])) {
$where[] = $this->db->prepare('and shop_id in %li', $filter['includeShopIds']);
}
if (!empty($filter['excludeShopIds'])) {
$where[] = $this->db->prepare('and shop_id not in %li', $filter['excludeShopIds']);
}
$whereString = implode(' ', $where);
$shopIds = $this->db->queryFirstColumn("select distinct `shop_id` from %b where `locked` = 0 %l %l", $queueName, $whereString);
if (empty($shopIds)) {
return array();
}
$shopId = $shopIds[array_rand($shopIds, 1)];
$queueMsg = $this->db->queryFirstRow('SELECT * FROM %b WHERE `shop_id` = %i AND locked = 0 %l', $queueName, $shopId);
if (empty($queueMsg)) {
return array();
}
$primaryColumn = QueueConst::getQueueIdColumnName($queueName);
$queueId = $queueMsg[$primaryColumn];
$affectRows = $this->db->update($queueName, array (
'locked' => $timerLockId,
'gmt_locked' => ZcDbEval::now(),
'gmt_modified' => ZcDbEval::now()
), "%b = %i AND locked = 0", $primaryColumn, $queueId);
if ($affectRows == 1) {
$queueMsg['locked'] = $timerLockId;
$this->timerDao->addHeartbeat($timerLockId, $queueId, $queueName);
return $queueMsg;
}
return array ();
}
public function deleteQueueMsg($queueName, $row) {
list($realDb, $realTbl) = DbRoute::getDbAndTbl($queueName);
$primaryColumn = QueueConst::getQueueIdColumnName($queueName);
return $this->db->useDbIdOnce($realDb)->delete($realTbl, '%b = %i', $primaryColumn, $row[$primaryColumn]);
}
/**
* 根据 priority asc、queueId asc 来 lockQueue支持根据includeVenderIds、excludeVenderIds、gmtExec来筛选
* @param integer $timerLockId
* @param string $queueName
* @param array $filter
* @return array
*/
public function lockPriorityQueue($timerLockId, $queueName, $filter) {
list ($realDb, $realTbl) = DbRoute::getDbAndTbl($queueName);
$primaryColumn = QueueConst::getQueueIdColumnName($realTbl);
$sortBy = $primaryColumn;
$where = array();
if ($filter['includeShopIds']) {
$where[] = $this->db->prepare('and shop_id in %li', $filter['includeShopIds']);
}
if ($filter['excludeShopIds']) {
$where[] = $this->db->prepare('and shop_id not in %li', $filter['excludeShopIds']);
}
if ($filter['gmtExec']) {
$where[] = $this->db->prepare('and (gmt_exec is null or gmt_exec <= %s)', date('Y-m-d H:i:s', strtotime($filter['gmtExec'])));
$sortBy = 'gmt_exec';
}
$whereString = implode(' ', $where);
$row = $this->db->useDbIdOnce($realDb)->queryFirstRow("select * from %b where `locked` = 0 %l order by priority asc, %b asc", $realTbl, $whereString, $sortBy);
if (empty($row)) {
return array();
}
// 乐观锁
$queueId = $row[$primaryColumn];
$affectRows = $this->db->useDbIdOnce($realDb)->update($realTbl, array (
'locked' => $timerLockId,
'hostname' => gethostname(),
'gmt_last_heartbeat' => ZcDbEval::now(),
'gmt_locked' => ZcDbEval::now(),
'gmt_modified' => ZcDbEval::now()
), "%b = %i and locked = 0", $primaryColumn, $queueId);
if ($affectRows == 1) {
$row['locked'] = $timerLockId;
$this->timerDao->addHeartbeat($timerLockId, $queueId, $queueName);
return $row;
}
return array ();
}
/**
* 根据 queueId asc 来 lockQueue支持根据includeVenderIds、excludeVenderIds、gmtExec来筛选
* @param integer $timerLockId
* @param string $queueName
* @param array $filter
* @return array
*/
public function lockFifoQueue($timerLockId, $queueName, $filter) {
list ($realDb, $realTbl) = DbRoute::getDbAndTbl($queueName);
$primaryColumn = QueueConst::getQueueIdColumnName($realTbl);
$sortBy = $primaryColumn;
$where = '';
if ($filter['includeShopIds']) {
$where[] = $this->db->prepare('and shop_id in %li', $filter['includeShopIds']);
}
if ($filter['excludeShopIds']) {
$where[] = $this->db->prepare('and shop_id not in %li', $filter['excludeShopIds']);
}
if ($filter['gmtExec']) {
$where[] = $this->db->prepare('and (gmt_exec is null or gmt_exec <= %s)', date('Y-m-d H:i:s', strtotime($filter['gmtExec'])));
$sortBy = 'gmt_exec';
}
$whereString = implode(' ', $where);
$row = $this->db->useDbIdOnce($realDb)->queryFirstRow("select * from %b where `locked` = 0 %l order by %b asc", $realTbl, $whereString, $sortBy);
if (empty($row)) {
return array();
}
// 乐观锁
$queueId = $row[$primaryColumn];
$affectRows = $this->db->useDbIdOnce($realDb)->update($realTbl, array (
'locked' => $timerLockId,
'hostname' => gethostname(),
'gmt_last_heartbeat' => ZcDbEval::now(),
'gmt_locked' => ZcDbEval::now(),
'gmt_modified' => ZcDbEval::now()
), "%b = %i and locked = 0", $primaryColumn, $queueId);
if ($affectRows == 1) {
$row['locked'] = $timerLockId;
$this->timerDao->addHeartbeat($timerLockId, $queueId, $queueName);
return $row;
}
return array ();
}
/**
* 根据 venderId 平均 lockQueue支持根据includeVenderIds、excludeVenderIds、gmtExec来筛选
* @param integer $timerLockId
* @param string $queueName
* @param array $filter
* @return array
*/
public function lockVenderAvgQueue($timerLockId, $queueName, $filter) {
$where = array();
if (!empty($filter['includeShopIds'])) {
$where[] = $this->db->prepare('and shop_id in %li', $filter['includeShopIds']);
}
if (!empty($filter['excludeShopIds'])) {
$where[] = $this->db->prepare('and shop_id not in %li', $filter['excludeShopIds']);
}
if (!empty($filter['gmtExec'])) {
$where[] = $this->db->prepare('and (gmt_exec is null or gmt_exec <= %s)', date('Y-m-d H:i:s', strtotime($filter['gmtExec'])));
}
$whereString = implode(' ', $where);
list($realDb, $realTbl) = DbRoute::getDbAndTbl($queueName);
$primaryColumn = QueueConst::getQueueIdColumnName($realTbl);
$shopIds = $this->db->useDbIdOnce($realDb)->queryFirstColumn("select distinct `shop_id` from %b where `locked` = 0 %l %l", $realTbl, $whereString);
if (empty($shopIds)) {
return array();
}
$shopId = $shopIds[array_rand($shopIds, 1)];
$queueMsg = $this->db->useDbIdOnce($realDb)->queryFirstRow('SELECT * FROM %b WHERE `shop_id` = %i AND locked = 0 %l ORDER BY %l ASC', $realTbl, $shopId, $whereString, $primaryColumn);
if (empty($queueMsg)) {
return array();
}
$queueId = $queueMsg[$primaryColumn];
$affectRows = $this->db->useDbIdOnce($realDb)->update($queueName, array (
'locked' => $timerLockId,
'hostname' => gethostname(),
'gmt_last_heartbeat' => ZcDbEval::now(),
'gmt_locked' => ZcDbEval::now(),
'gmt_modified' => ZcDbEval::now()
), "%b = %i AND locked = 0", $primaryColumn, $queueId);
if ($affectRows == 1) {
$queueMsg['locked'] = $timerLockId;
$this->timerDao->addHeartbeat($timerLockId, $queueId, $queueName);
return $queueMsg;
}
return array ();
}
/**
* 根据 redis 中弹出的id来锁定queue
* @param integer $timerLockId
* @param string $queueName
* @param boolean $isAdminSelf
* @return array
*/
public function lockSolidRedisQueue($timerLockId, $queueName, $isAdminSelf) {
$queueRedisKey = RedisKeyConst::getQueueRedisKey($queueName, $isAdminSelf);
$queueId = $this->solidRedis->rPop($queueRedisKey);
if (empty($queueId)) {
return array();
}
list ($realDb, $realTbl) = DbRoute::getDbAndTbl($queueName);
$primaryColumn = QueueConst::getQueueIdColumnName($queueName);
$row = $this->db->useDbIdOnce($realDb)->queryFirstRow("select * from %b where `locked` = 0 and %b = %i", $realTbl, $primaryColumn, $queueId);
if (empty($row)) {
return array();
}
$queueId = $row[$primaryColumn];
$affectRows = $this->db->useDbIdOnce($realDb)->update($realTbl, array(
'locked' => $timerLockId,
'hostname' => gethostname(),
'gmt_last_heartbeat' => ZcDbEval::now(),
'gmt_locked' => ZcDbEval::now(),
'gmt_modified' => ZcDbEval::now()
), "%b = %i and locked = 0", $primaryColumn, $queueId);
if ($affectRows == 1) {
$row['locked'] = $timerLockId;
$this->timerDao->addHeartbeat($timerLockId, $queueId, $queueName);
return $row;
}
return array();
}
public function batchLockSolidRedisQueue($timerLockId, $queueName, $isAdminSelf, $lockCount) {
$queueRedisKey = RedisKeyConst::getQueueRedisKey($queueName, $isAdminSelf);
$queueIds = [];
for ($i = 0; $i < $lockCount; $i++) {
$queueId = $this->solidRedis->rPop($queueRedisKey);
if (!$queueId) {
break;
}
$queueIds[] = $queueId;
}
if (!$queueIds) {
return [];
}
list ($realDb, $realTbl) = DbRoute::getDbAndTbl($queueName);
$primaryColumn = QueueConst::getQueueIdColumnName($queueName);
$rows = $this->db->useDbIdOnce($realDb)->query("select * from %b where `locked` = 0 and %b in %li", $realTbl, $primaryColumn, $queueIds);
if (empty($rows)) {
return [];
}
$findQueueIds = array_column($rows, $primaryColumn);
$affectRows = $this->db->useDbIdOnce($realDb)->update($realTbl, array(
'locked' => $timerLockId,
'hostname' => gethostname(),
'gmt_last_heartbeat' => ZcDbEval::now(),
'gmt_locked' => ZcDbEval::now(),
'gmt_modified' => ZcDbEval::now()
), "%b in %li and locked = 0", $primaryColumn, $findQueueIds);
if (count($findQueueIds) == $affectRows) {
$rows = array_map(function ($row) use ($timerLockId) {
$row['locked'] = $timerLockId;
return $row;
}, $rows);
$this->timerDao->updateTimerLockHeartbeat($timerLockId, null);
return $rows;
}
Zc::getLog("timer/batch_lock_fail/{$queueName}")->info("find queueIds[" . implode(',', $findQueueIds) . "] lock affectRows[$affectRows]");
return [];
}
/**
* 根据 redis 中弹出的id来锁定queue
* @param integer $timerLockId
* @param string $queueName
* @param boolean $isAdminSelf
* @return array
*/
public function lockSolidRedisQueueByBufferId($timerLockId, $queueName, $isAdminSelf, $bufferPrimaryColumn) {
$queueRedisKey = RedisKeyConst::getQueueRedisKey($queueName, $isAdminSelf);
$bufferId = $this->solidRedis->rPop($queueRedisKey);
if (empty($bufferId)) {
return array();
}
list ($realDb, $realTbl) = DbRoute::getDbAndTbl($queueName);
$row = $this->db->useDbIdOnce($realDb)->queryFirstRow("select * from %b where `locked` = 0 and %b = %i", $realTbl, $bufferPrimaryColumn, $bufferId);
if (empty($row)) {
return array();
}
$primaryColumn = QueueConst::getQueueIdColumnName($queueName);
$queueId = $row[$primaryColumn];
$affectRows = $this->db->useDbIdOnce($realDb)->update($realTbl, array(
'locked' => $timerLockId,
'hostname' => gethostname(),
'gmt_last_heartbeat' => ZcDbEval::now(),
'gmt_locked' => ZcDbEval::now(),
'gmt_modified' => ZcDbEval::now()
), "%b = %i and locked = 0", $primaryColumn, $queueId);
if ($affectRows == 1) {
$row['locked'] = $timerLockId;
$this->timerDao->addHeartbeat($timerLockId, $queueId, $queueName);
return $row;
}
return array();
}
public function addMonitorRedisLockErrorLog($queueName, $timerLockId, $primaryKeyId, $sql, $reason) {
list($logDbId, $realLogTbl) = DbRoute::getDbAndTbl(TblConst::redis_lock_error_log);
return $this->db->useDbIdOnce($logDbId)->insert($realLogTbl, array(
'queue_name' => $queueName,
'timer_lock_id' => $timerLockId,
'primary_key_id' => $primaryKeyId,
'sql' => $sql,
'reason' => $reason,
'hostname' => gethostname(),
'gmt_create' => ZcDbEval::now(),
'gmt_modified' => ZcDbEval::now()
));
}
public function allocBufferToQueueByShopAvg($bufferName, $queueName, $filter, $maxQueueCount = 300, $shopLimit = null) {
$allowMoveCnt = $maxQueueCount - $this->getQueueCnt($queueName, $filter);
if ($allowMoveCnt < 1) {
return array();
}
$shopIdAndCntMap = $this->getShopIdAndBufferCntMap($bufferName, $filter);
if (is_numeric($shopLimit)) {
$shopIdAndCntMap = $this->rebuildShopIdAndCntMapByShopLimit($shopIdAndCntMap, $queueName, $shopLimit);
}
return $this->allotShopMoveBufferCnt($shopIdAndCntMap, $allowMoveCnt);
}
private function getQueueCnt($queueName, $filter) {
list($realDb, $realTbl) = DbRoute::getDbAndTbl($queueName);
$whereStr = $this->getFilterShopIdWhere($filter['includeShopIds'], $filter['excludeShopIds']);
return $this->db->useDbIdOnce($realDb)->queryFirstField("SELECT COUNT(*) FROM %b WHERE %l", $realTbl, $whereStr);
}
private function getShopIdAndBufferCntMap($bufferName, $filter) {
list($realDb, $realTbl) = DbRoute::getDbAndTbl($bufferName);
$whereStr = $this->getFilterShopIdWhere($filter['includeShopIds'], $filter['excludeShopIds'], $filter['gmtExec']);
$bufferList = $this->db->useDbIdOnce($realDb)->query("SELECT shop_id, COUNT(*) AS cnt FROM %b WHERE %l GROUP BY shop_id", $realTbl, $whereStr);
return ZcArrayHelper::mapNameValue($bufferList, 'shop_id', 'cnt');
}
public function getFilterShopIdWhere($includeShopIds, $excludeShopIds, $gmtExec = null) {
$extraWhere = '';
if ($gmtExec) {
$extraWhere = ' ' . $this->db->prepare('and gmt_exec <= %s', date('Y-m-d H:i:s', strtotime($gmtExec)));
}
if (!empty($includeShopIds)) {
return $this->db->prepare('shop_id IN %li', $includeShopIds) . $extraWhere;
}
if (!empty($excludeShopIds)) {
return $this->db->prepare('shop_id not in %li', $excludeShopIds) . $extraWhere;
}
return '1 = 1' . $extraWhere;
}
private function rebuildShopIdAndCntMapByShopLimit($shopIdAndCntMap, $queueName, $shopLimit) {
if (empty($shopIdAndCntMap) || !is_numeric($shopLimit)) {
return $shopIdAndCntMap;
}
list($realDb, $realTbl) = DbRoute::getDbAndTbl($queueName);
$queueStatList = $this->db->useDbIdOnce($realDb)->query("SELECT shop_id, COUNT(*) AS cnt FROM %b GROUP BY shop_id", $realTbl);
$shopIdAndQueueCntMap = ZcArrayHelper::mapNameValue($queueStatList, 'shop_id', 'cnt');
foreach ($shopIdAndCntMap as $shopId => $cnt) {
$shopAllowMoveMaxCnt = $shopLimit - $shopIdAndQueueCntMap[$shopId];
$shopAllowMoveCnt = min(array($cnt, $shopAllowMoveMaxCnt));
if ($shopAllowMoveCnt < 1) {
unset($shopIdAndCntMap[$shopId]);
continue;
}
$shopIdAndCntMap[$shopId] = $shopAllowMoveCnt;
}
return $shopIdAndCntMap;
}
private function allotShopMoveBufferCnt($shopIdAndCntMap, $allowMoveCnt){
$shopIdAndMoveCntMap = array();
$waitSelectShopIds = array_keys($shopIdAndCntMap);
shuffle($waitSelectShopIds);
do {
if (empty($waitSelectShopIds) || ($allowMoveCnt < 1)) {
break;
}
foreach ($waitSelectShopIds as $index => $mallId) {
if ($allowMoveCnt < 1) {
break;
}
$waitMoveCnt = $shopIdAndCntMap[$mallId];
if ($waitMoveCnt < 1) {
unset($waitSelectShopIds[$index]);
continue;
}
$shopIdAndMoveCntMap[$mallId]++;
$shopIdAndCntMap[$mallId]--;
$allowMoveCnt--;
}
} while(true);
return $shopIdAndMoveCntMap;
}
}