setName('timer') ->addOption('temp', null, Option::VALUE_NONE) ->addOption('quit', null, Option::VALUE_NONE) ->addOption('local', null, Option::VALUE_NONE) ->addOption('local-host', null, Option::VALUE_OPTIONAL, '本地域名', 'http://localhost') ->addOption('local-port', null, Option::VALUE_OPTIONAL, '本地端口', '8000') ->setDescription('内置秒级定时器'); } protected function execute(Input $input, Output $output) { try { // 指令输出 $output->writeln('start timer'); $site_domain = sysconfig('site', 'site_domain'); if (empty($site_domain)) { $output->writeln('请前往后台设置站点域名(site_domain)配置项'); return; } $host = $site_domain; if ($input->hasOption('local')) { $host = $input->getOption('local-host') . ':' . $input->getOption('local-port'); } $output->writeln('站点域名:' . $host); $site_host = parse_url($host, PHP_URL_HOST); // 同步配置到数据库 TimerService::syncConfigToDatabase(); // 设置配置的任务 $timer_service = new TimerService(); $request_list = $timer_service->generateAllRequestList(); $call_list = $timer_service->generateAllCallList(); // 内置的节点注册任务 $system_host_register = [ 'name' => 'system_host_register', // 定时任务的名称,不能重复 'type' => 'call', // 定时任务的类型,默认只支持site,你也可以重写定时器命令行以支持其他命令 'target' => [HostService::class, 'heartbeat'], // 要访问的地址,如果不是以https开头,那么以后台的系统配置中读取相关配置,如果没有配置则不执行 'frequency' => 30, // 执行频率,单位:秒,填写10,则每10秒过后执行一次 ]; $system_host_call_list = TimerService::generateTaskInstanceFromConfig($system_host_register); $call_list = array_merge($call_list, $system_host_call_list); $this->host = $host; $this->siteDomain = $site_domain; $this->siteHost = $site_host; $this->requestList = $request_list; $this->callList = $call_list; // 文本模式:正常运行定时器 $timer_mode = Config::get('timer.mode', 'normal'); if ($timer_mode == 'normal') { $this->runNormal(); } else { $this->runParallel(); } } catch (\Throwable $e) { throw $e; } } /** * 基于 run_type 判断当前节点是否应执行该任务(仅对 site 类型生效). * 在 Cache 节流之后、实际执行之前调用。 * * @param array|\ArrayAccess $task_item 任务实例(含 name/type/frequency 等) * @return bool true=应执行 false=跳过 */ protected function shouldExecuteTask($task_item): bool { // call 类型任务不受 run_type 调度层影响 if (($task_item['type'] ?? '') === 'call') { return true; } $task_name = $task_item['name'] ?? ''; try { $config = Db::name('system_timer_config') ->where('task_name', $task_name) ->find(); } catch (\Throwable $e) { Log::warning('shouldExecuteTask: 查询 system_timer_config 失败 - ' . $e->getMessage()); return true; } // 无数据库记录时不阻塞(回退到默认行为) if (empty($config)) { return true; } // status=0 表示任务已禁用 if (isset($config['status']) && (int) $config['status'] === 0) { return false; } $run_type = $config['run_type'] ?? 'auto'; switch ($run_type) { case 'main': // 仅主节点执行 $master_node = HostService::getMasterNode(); $node_id = HostService::getNodeId(); return ($master_node === $node_id); case 'auto': // 两阶段 DB 行锁竞争:竞争成功后释放锁,再执行任务 Db::startTrans(); try { $row = Db::name('system_timer_config') ->where('task_name', $task_name) ->lock(true) ->find(); if ($row && !empty($row['last_execute_time']) && $row['last_execute_time'] > 0) { if ((time() - (int) $row['last_execute_time']) < ($task_item['frequency'] ?? 0)) { Db::commit(); return false; // 另一个节点刚执行过 } } Db::name('system_timer_config') ->where('task_name', $task_name) ->update([ 'last_execute_node' => HostService::getNodeId(), 'last_execute_time' => time(), 'update_time' => time(), ]); Db::commit(); return true; } catch (\Throwable $e) { Db::rollback(); Log::warning('shouldExecuteTask auto mode exception: ' . $e->getMessage()); return false; } case 'all': // 所有节点都执行 return true; case 'manual': // 手动触发:检查 manual_trigger=1,执行一次后重置为 0 if (!empty($config['manual_trigger'])) { Db::name('system_timer_config') ->where('task_name', $task_name) ->update([ 'manual_trigger' => 0, 'update_time' => time(), ]); return true; } return false; default: // 未知 run_type 不阻塞 return true; } } public function runParallel() { // 重新构造命令行参数,以便兼容workerman的命令 global $argv; $argv = []; array_unshift($argv, 'think', 'start'); $host = $this->host; $site_host = $this->siteHost; $output = $this->output; $input = $this->input; $call_list = $this->callList; $worker = new Worker(); $worker->count = 1; $worker->name = 'timer_request'; $worker->timerRequestList = $this->requestList; $worker->onWorkerStart = function () use ($worker, $host, $site_host, $output, $input) { $options = [ 'max_conn_per_addr' => Config::get('timer.max_conn_per_addr', 1000), 'keepalive_timeout' => Config::get('timer.keepalive_timeout', 86400), 'connect_timeout' => Config::get('timer.connect_timeout', 86400), 'timeout' => Config::get('timer.timeout', 86400), ]; $http = new HttpClient($options); Timer::add(1, function () use ($worker, $host, $site_host, $output, $input, $http) { $request_list = $worker->timerRequestList; foreach ($request_list as $request_item) { $output->writeln(date('Y-m-d H:i:s') . ': build site request async: ' . $request_item['target']); if (!isset($request_item['is_running'])) { $request_item['is_running'] = false; } if ($request_item['is_running']) { $output->writeln('进行中,跳过'); continue; } if (!isset($request_item['last_run_time'])) { $request_item['last_run_time'] = 0; } if (time() - $request_item['last_run_time'] < $request_item['frequency']) { continue; } $request_item['is_running'] = true; $request_item['last_run_time'] = time(); // run_type 调度检查(节流之后、实际执行之前) if (!$this->shouldExecuteTask($request_item)) { $request_item['is_running'] = false; continue; } $http->request($host . $request_item['target'], [ 'headers' => [ 'Host' => $site_host, 'Accept' => 'application/json,text/plain', ], 'success' => function ($response) use ($request_item, $output) { $request_item['is_running'] = false; $output->writeln((string) $response->getBody()); }, 'error' => function ($exception) use ($request_item, $output) { $request_item['is_running'] = false; $output->writeln($exception); }, ]); } }); }; // 运行call任务 foreach ($call_list as $call_item) { $worker_call = new Worker(); $worker_call->count = 1; $worker_call->name = 'timer_call_' . $call_item['name'] . '_' . $call_item['concurrency_id']; $worker_call->onWorkerStart = function () use ($worker_call, $call_item, $output, $input) { Timer::add(1, function () use ($worker_call, $call_item, $output, $input) { // TODO:统一通过相同的方法判断任务是否该执行 $cache_key = 'timer_request_' . $call_item['name'] . '_' . $call_item['concurrency_id']; $cache_tag = 'system_timer'; $last_exec_time = Cache::get($cache_key, 0); if ($last_exec_time >= time() - $call_item['frequency']) { return; } Cache::tag($cache_tag)->set($cache_key, time()); call_user_func($call_item['target']); $output->writeln(date('Y-m-d H:i:s') . ': call ' . $call_item['name']); }); }; } Worker::runAll(); } public function runNormal() { $host = $this->host; $site_host = $this->siteHost; $request_list = $this->requestList; $output = $this->output; $input = $this->input; $client = new Client([ 'base_uri' => $host, 'headers' => [ 'Host' => $site_host, 'Accept' => 'application/json,text/plain', ], 'verify' => false, ]); while (true) { // 运行请求任务 try { $list_promises = []; foreach ($request_list as $request_item) { $name = $request_item['name']; $cache_key = 'timer_request_' . $name . '_' . $request_item['concurrency_id']; $cache_tag = 'system_timer'; $last_exec_time = Cache::get($cache_key, 0); if ($last_exec_time >= time() - $request_item['frequency']) { continue; } Cache::tag($cache_tag)->set($cache_key, time()); // run_type 调度检查(Cache 节流之后、实际执行之前) if (!$this->shouldExecuteTask($request_item)) { continue; } $output->writeln(date('Y-m-d H:i:s') . ': build site request async: ' . $request_item['target']); $list_promises[$request_item['name']] = $client->getAsync($request_item['target']); } if (empty($list_promises)) { if (!$input->hasOption('quit')) { $output->writeln(date('Y-m-d H:i:s') . ' no request'); } } else { $results = Utils::unwrap($list_promises); $output->writeln(date('Y-m-d H:i:s') . ': request all finished'); } } catch (\Throwable $th) { // throw $th; $output->writeln('error:' . $th->getMessage()); Log::error($th->getMessage()); } // 运行call任务 $call_list = $this->callList; foreach ($call_list as $call_item) { $name = $call_item['name']; $cache_key = 'timer_call_' . $name . '_' . $call_item['concurrency_id']; $cache_tag = 'call_timer'; $last_exec_time = Cache::get($cache_key, 0); if ($last_exec_time >= time() - $call_item['frequency']) { continue; } Cache::tag($cache_tag)->set($cache_key, time()); call_user_func($call_item['target']); $output->writeln(date('Y-m-d H:i:s') . ': call function ' . $name . ' finished'); } if ($input->hasOption('temp')) { break; } sleep(1); } } }