setName('timer') ->addOption('temp', null, Option::VALUE_NONE) ->addOption('local', null, Option::VALUE_NONE) ->addOption('local-host', null, Option::VALUE_OPTIONAL, '本地域名', 'http://localhost') ->addOption('local-port', null, Option::VALUE_OPTIONAL, '本地端口', '8000') ->setDescription('内置秒级定时器'); } protected function execute(Input $input, Output $output) { try { // 指令输出 $output->writeln('start timer'); $site_domain = sysconfig('site', 'site_domain'); if (empty($site_domain)) { $output->writeln('请前往后台设置站点域名(site_domain)配置项'); return; } $host = $site_domain; if ($input->hasOption('local')) { $host = $input->getOption('local-host') . ':' . $input->getOption('local-port'); } $output->writeln('站点域名:' . $host); $site_host = parse_url($host, PHP_URL_HOST); // 同步配置到数据库 TimerService::syncConfigToDatabase(); // 设置配置的任务 $timer_service = new TimerService(); $request_list = $timer_service->generateAllRequestList(); $call_list = $timer_service->generateAllCallList(); // 内置的节点注册任务 $system_host_register = [ 'name' => 'system_host_register', // 定时任务的名称,不能重复 'type' => 'call', // 定时任务的类型,默认只支持site,你也可以重写定时器命令行以支持其他命令 'target' => [HostService::class, 'heartbeat'], // 要访问的地址,如果不是以https开头,那么以后台的系统配置中读取相关配置,如果没有配置则不执行 'frequency' => 30, // 执行频率,单位:秒,填写10,则每10秒过后执行一次 ]; $system_host_call_list = TimerService::generateTaskInstanceFromConfig($system_host_register); $call_list = array_merge($call_list, $system_host_call_list); $this->host = $host; $this->siteDomain = $site_domain; $this->siteHost = $site_host; $this->requestList = $request_list; $this->callList = $call_list; // 运行定时器 $this->runLoop(); } catch (\Throwable $e) { throw $e; } } /** * 基于 run_type 判断当前节点是否应执行该任务(仅对 site 类型生效). * 在 Cache 节流之后、实际执行之前调用。 * * @param array|\ArrayAccess $task_item 任务实例(含 name/type/frequency 等) * @return bool true=应执行 false=跳过 */ protected function shouldExecuteTask($task_item): bool { // call 类型任务不受 run_type 调度层影响 if (($task_item['type'] ?? '') === 'call') { return true; } $task_name = $task_item['name'] ?? ''; try { $config = Db::name('system_timer_config') ->where('task_name', $task_name) ->find(); } catch (\Throwable $e) { Log::warning('shouldExecuteTask: 查询 system_timer_config 失败 - ' . $e->getMessage()); return true; } // 无数据库记录时不阻塞(回退到默认行为) if (empty($config)) { return true; } // status=0 表示任务已禁用 if (isset($config['status']) && (int) $config['status'] === 0) { return false; } $run_type = $config['run_type'] ?? 'auto'; switch ($run_type) { case 'main': // 仅主节点执行 $master_node = HostService::getMasterNode(); $node_id = HostService::getNodeId(); return ($master_node === $node_id); case 'auto': // 两阶段 DB 行锁竞争:竞争成功后释放锁,再执行任务 Db::startTrans(); try { $row = Db::name('system_timer_config') ->where('task_name', $task_name) ->lock(true) ->find(); if ($row && !empty($row['last_execute_time']) && $row['last_execute_time'] > 0) { if ((time() - (int) $row['last_execute_time']) < ($task_item['frequency'] ?? 0)) { Db::commit(); return false; // 另一个节点刚执行过 } } Db::name('system_timer_config') ->where('task_name', $task_name) ->update([ 'last_execute_node' => HostService::getNodeId(), 'last_execute_time' => time(), 'update_time' => time(), ]); Db::commit(); return true; } catch (\Throwable $e) { Db::rollback(); Log::warning('shouldExecuteTask auto mode exception: ' . $e->getMessage()); return false; } case 'all': // 所有节点都执行 return true; case 'manual': // 手动触发:检查 manual_trigger=1,执行一次后重置为 0 if (!empty($config['manual_trigger'])) { Db::name('system_timer_config') ->where('task_name', $task_name) ->update([ 'manual_trigger' => 0, 'update_time' => time(), ]); return true; } return false; default: // 未知 run_type 不阻塞 return true; } } public function runLoop() { $host = $this->host; $site_host = $this->siteHost; $request_list = $this->requestList; $output = $this->output; $input = $this->input; $handler = new CurlMultiHandler([ 'select_timeout' => Config::get('timer.select_timeout', 0.001), 'max_handles' => Config::get('timer.max_handles', 100), ]); $stack = HandlerStack::create($handler); $client = new Client([ 'handler' => $stack, 'base_uri' => $host, 'timeout' => Config::get('timer.timeout', 86400), 'connect_timeout' => Config::get('timer.connect_timeout', 30), 'headers' => [ 'Host' => $site_host, 'Accept' => 'application/json,text/plain', ], 'verify' => false, ]); $pending = []; while (true) { try { $has_new_task = false; // --- site 任务:非阻塞发火 --- foreach ($request_list as $request_item) { $name = $request_item['name']; $key = $name . '_' . $request_item['concurrency_id']; // 已在飞 -> 跳过 if (isset($pending[$key])) { continue; } $cache_key = 'timer_request_' . $name . '_' . $request_item['concurrency_id']; $cache_tag = 'system_timer'; $last_exec_time = Cache::get($cache_key, 0); if ($last_exec_time >= time() - $request_item['frequency']) { continue; } Cache::tag($cache_tag)->set($cache_key, time()); // run_type 调度检查(Cache 节流之后、实际执行之前) if (!$this->shouldExecuteTask($request_item)) { continue; } $output->writeln(date('Y-m-d H:i:s') . ': build site request async: ' . $request_item['target']); $promise = $client->getAsync($request_item['target']); $pending[$key] = $promise; $has_new_task = true; $promise->then( function ($response) use ($key, &$pending, $output) { unset($pending[$key]); $output->writeln(date('Y-m-d H:i:s') . ': ' . $key . ' finished'); }, function ($reason) use ($key, &$pending, $output) { unset($pending[$key]); $output->writeln(date('Y-m-d H:i:s') . ': ' . $key . ' error: ' . $reason->getMessage()); } ); } // --- 非阻塞推进 curl --- $handler->tick(); Utils::queue()->run(); // --- 空闲提示 --- if (empty($pending) && !$has_new_task) { if (!$input->hasOption('quiet')) { $output->writeln(date('Y-m-d H:i:s') . ' no request'); } } } catch (\Throwable $th) { $output->writeln('error:' . $th->getMessage()); Log::error($th->getMessage()); } // 运行call任务 $call_list = $this->callList; foreach ($call_list as $call_item) { $name = $call_item['name']; $cache_key = 'timer_call_' . $name . '_' . $call_item['concurrency_id']; $cache_tag = 'call_timer'; $last_exec_time = Cache::get($cache_key, 0); if ($last_exec_time >= time() - $call_item['frequency']) { continue; } Cache::tag($cache_tag)->set($cache_key, time()); call_user_func($call_item['target']); $output->writeln(date('Y-m-d H:i:s') . ': call function ' . $name . ' finished'); } if ($input->hasOption('temp')) { if (!empty($pending)) { try { Utils::unwrap($pending); } catch (\Throwable $th) { $output->writeln('error:' . $th->getMessage()); Log::error($th->getMessage()); } } break; } // --- sleep 策略 --- if (empty($pending)) { usleep(200000); } else { usleep(50000); } } } }