<?php namespace Base\Task; /** * 任务处理 * @author elf<360197197@qq.com> */ class Task { public static $types = [ 'market-shift' => '\Base\Task\MarketShiftTask', 'test' => '\Base\Task\TestTask', ]; public static $queues = [ 'common' => ['test'], 'market-shift' => ['market-shift'] ]; public $queue; public $tasks; public function __construct($queue = 'common') { $this->queue = $queue; } public function run($count = 1) { $map = ['status' => 0]; if (!isset(self::$queues[$this->queue])) { throw new \Exception('无此队列'); } $types = self::$queues[$this->queue]; if (count($types) > 0) { $map['type'] = ['in', $types]; } else { throw new \Exception('暂无任务'); } $this->tasks = M('tasks', 'tab_')->where($map)->limit($count)->select(); if (count($this->tasks) == 0) { throw new \Exception('暂无任务'); } $this->updateTasks(['status' => 1, 'start_time' => time()]); foreach ($this->tasks as $task) { $class = $this->getTypeClass($task); if (is_null($class) || !class_exists($class)) { $this->updateTask($task, ['status' => 3, 'end_time' => time(), 'result' => '任务处理类不存在']); continue; } try { $params = json_decode($task['params'], true); $obj = new $class($params); $obj->run(); $this->updateTask($task, ['status' => 2, 'end_time' => time(), 'result' => '处理成功']); } catch (\Exception $e) { $this->updateTask($task, ['status' => 3, 'end_time' => time(), 'result' => $e->getMessage()]); } } } public function getTypeClass($task) { if (!isset(self::$types[$task['type']])) { return null; } return self::$types[$task['type']]; } private function updateTask($task, $data) { M('tasks', 'tab_')->where(['id' => $task['id']])->save($data); } private function updateTasks($data) { $ids = array_column($this->tasks, 'id'); M('tasks', 'tab_')->where(['id' => ['in', $ids]])->save($data); } public static function add($type, $params = []) { if (!isset(self::$types[$type])) { return false; } $record = [ 'type' => $type, 'params' => json_encode($params), 'created_time' => time() ]; return M('tasks', 'tab_')->add($record); } }