cf-platform/Application/Base/Task/Task.class.php

96 lines
2.6 KiB
PHP

<?php
namespace Base\Task;
/**
* 任务处理
* @author elf<360197197@qq.com>
*/
class Task
{
public static $types = [
'market-shift' => '\Base\Task\MarketShiftTask',
'test' => '\Base\Task\TestTask',
];
public static $queues = [
'common' => ['test'],
'market-shift' => ['market-shift']
];
public $queue;
public $tasks;
public function __construct($queue = 'common')
{
$this->queue = $queue;
}
public function run($count = 1)
{
$map = ['status' => 0];
if (!isset(self::$queues[$this->queue])) {
throw new \Exception('无此队列');
}
$types = self::$queues[$this->queue];
if (count($types) > 0) {
$map['type'] = ['in', $types];
} else {
throw new \Exception('暂无任务');
}
$this->tasks = M('tasks', 'tab_')->where($map)->limit($count)->select();
if (count($this->tasks) == 0) {
throw new \Exception('暂无任务');
}
$this->updateTasks(['status' => 1, 'start_time' => time()]);
foreach ($this->tasks as $task) {
$class = $this->getTypeClass($task);
if (is_null($class) || !class_exists($class)) {
$this->updateTask($task, ['status' => 3, 'end_time' => time(), 'result' => '任务处理类不存在']);
continue;
}
try {
$params = json_decode($task['params'], true);
$obj = new $class($params);
$obj->run();
$this->updateTask($task, ['status' => 2, 'end_time' => time(), 'result' => '处理成功']);
} catch (\Exception $e) {
$this->updateTask($task, ['status' => 3, 'end_time' => time(), 'result' => $e->getMessage()]);
}
}
}
public function getTypeClass($task)
{
if (!isset(self::$types[$task['type']])) {
return null;
}
return self::$types[$task['type']];
}
private function updateTask($task, $data)
{
M('tasks', 'tab_')->where(['id' => $task['id']])->save($data);
}
private function updateTasks($data)
{
$ids = array_column($this->tasks, 'id');
M('tasks', 'tab_')->where(['id' => ['in', $ids]])->save($data);
}
public static function add($type, $params = [])
{
if (!isset(self::$types[$type])) {
return false;
}
$record = [
'type' => $type,
'params' => json_encode($params),
'created_time' => time()
];
return M('tasks', 'tab_')->add($record);
}
}