setName('timer') ->addOption('temp', null, Option::VALUE_NONE) ->addOption('quit', null, Option::VALUE_NONE) ->addOption('local', null, Option::VALUE_NONE) ->addOption('local-host', null, Option::VALUE_OPTIONAL, '本地域名', 'http://localhost') ->addOption('local-port', null, Option::VALUE_OPTIONAL, '本地端口', '80') ->setDescription('内置秒级定时器'); } protected function execute(Input $input, Output $output) { // 指令输出 $output->writeln('start timer'); $site_domain = sysconfig('site', 'site_domain'); if (empty($site_domain)) { $output->writeln('请前往后台设置站点域名(site_domain)配置项'); return; } $host = $site_domain; if ($input->hasOption('local')) { $host = $input->getOption('local-host') . ':' . $input->getOption('local-port'); } $output->writeln('站点域名:' . $host); $site_host = parse_url($host, PHP_URL_HOST); // 设置配置的任务 $timer_service = new TimerService(); $request_list = $timer_service->generateAllRequestList(); $call_list = $timer_service->generateAllCallList(); // 内置的节点注册任务 $system_host_register = [ 'name' => 'system_host_register', // 定时任务的名称,不能重复 'type' => 'call', // 定时任务的类型,默认只支持site,你也可以重写定时器命令行以支持其他命令 'target' => [HostService::class, 'heartbeat'], // 要访问的地址,如果不是以https开头,那么以后台的系统配置中读取相关配置,如果没有配置则不执行 'frequency' => 30, // 执行频率,单位:秒,填写10,则每10秒过后执行一次 ]; $system_host_call_list = TimerService::generateTaskInstanceFromConfig($system_host_register); $call_list = array_merge($call_list, $system_host_call_list); $this->host = $host; $this->siteDomain = $site_domain; $this->siteHost = $site_host; $this->requestList = $request_list; $this->callList = $call_list; $timer_mode = Config::get('timer.mode', 'normal'); if ($timer_mode == 'normal') { $this->runNormal(); } else { $this->runParallel(); } } public function runParallel() { // 重新构造命令行参数,以便兼容workerman的命令 global $argv; $argv = []; array_unshift($argv, 'think', 'start'); $host = $this->host; $site_host = $this->siteHost; $output = $this->output; $input = $this->input; $call_list = $this->callList; $worker = new Worker(); $worker->count = 1; $worker->name = 'timer_request'; $worker->timerRequestList = $this->requestList; $worker->onWorkerStart = function () use ($worker, $host, $site_host, $output, $input) { $options = [ 'max_conn_per_addr' => Config::get('timer.max_conn_per_addr', 1000), 'keepalive_timeout' => Config::get('timer.keepalive_timeout', 86400), 'connect_timeout' => Config::get('timer.connect_timeout', 86400), 'timeout' => Config::get('timer.timeout', 86400), ]; $http = new HttpClient($options); Timer::add(1, function () use ($worker, $host, $site_host, $output, $input, $http) { $request_list = $worker->timerRequestList; foreach ($request_list as $request_item) { $output->writeln(date('Y-m-d H:i:s') . ': build site request async: ' . $request_item['target']); if (!isset($request_item['is_running'])) { $request_item['is_running'] = false; } if ($request_item['is_running']) { $output->writeln('进行中,跳过'); continue; } if (!isset($request_item['last_run_time'])) { $request_item['last_run_time'] = 0; } if (time() - $request_item['last_run_time'] < $request_item['frequency']) { continue; } $request_item['is_running'] = true; $request_item['last_run_time'] = time(); // $http->request($host . $request_item['target'], [ 'headers' => [ 'Host' => $site_host, 'Accept' => 'application/json,text/plain', ], 'success' => function ($response) use ($request_item, $output) { $request_item['is_running'] = false; $output->writeln((string) $response->getBody()); }, 'error' => function ($exception) use ($request_item, $output) { $request_item['is_running'] = false; $output->writeln($exception); }, ]); } }); }; // 运行call任务 foreach ($call_list as $call_item) { $worker_call = new Worker(); $worker_call->count = 1; $worker_call->name = 'timer_call_' . $call_item['name'] . '_' . $call_item['concurrency_id']; $worker_call->onWorkerStart = function () use ($worker_call, $call_item, $output, $input) { Timer::add(1, function () use ($worker_call, $call_item, $output, $input) { // TODO:统一通过相同的方法判断任务是否该执行 $cache_key = 'timer_request_' . $call_item['name'] . '_' . $call_item['concurrency_id']; $cache_tag = 'system_timer'; $last_exec_time = Cache::get($cache_key, 0); if ($last_exec_time >= time() - $call_item['frequency']) { return; } Cache::tag($cache_tag)->set($cache_key, time()); call_user_func($call_item['target']); $output->writeln(date('Y-m-d H:i:s') . ': call ' . $call_item['name']); }); }; } Worker::runAll(); } public function runNormal() { $host = $this->host; $site_host = $this->siteHost; $request_list = $this->requestList; $output = $this->output; $input = $this->input; $client = new Client([ 'base_uri' => $host, 'headers' => [ 'Host' => $site_host, 'Accept' => 'application/json,text/plain', ], 'verify' => false, ]); while (true) { // 运行请求任务 try { $list_promises = []; foreach ($request_list as $request_item) { $name = $request_item['name']; $cache_key = 'timer_request_' . $name . '_' . $request_item['concurrency_id']; $cache_tag = 'system_timer'; $last_exec_time = Cache::get($cache_key, 0); if ($last_exec_time >= time() - $request_item['frequency']) { continue; } Cache::tag($cache_tag)->set($cache_key, time()); $output->writeln(date('Y-m-d H:i:s') . ': build site request async: ' . $request_item['target']); $list_promises[$request_item['name']] = $client->getAsync($request_item['target']); } if (empty($list_promises)) { if (!$input->hasOption('quit')) { $output->writeln(date('Y-m-d H:i:s') . ' no request'); } } else { $results = Utils::unwrap($list_promises); $output->writeln(date('Y-m-d H:i:s') . ': request all finished'); } } catch (\Throwable $th) { // throw $th; $output->writeln('error:' . $th->getMessage()); Log::error($th->getMessage()); } // 运行call任务 $call_list = $this->callList; foreach ($call_list as $call_item) { $name = $call_item['name']; $cache_key = 'timer_call_' . $name . '_' . $call_item['concurrency_id']; $cache_tag = 'call_timer'; $last_exec_time = Cache::get($cache_key, 0); if ($last_exec_time >= time() - $call_item['frequency']) { continue; } Cache::tag($cache_tag)->set($cache_key, time()); call_user_func($call_item['target']); $output->writeln(date('Y-m-d H:i:s') . ': call function ' . $name . ' finished'); } if ($input->hasOption('temp')) { break; } sleep(1); } } }