diff --git a/.agents/skills/ulthon-timer/SKILL.md b/.agents/skills/ulthon-timer/SKILL.md index c340514..df9c202 100644 --- a/.agents/skills/ulthon-timer/SKILL.md +++ b/.agents/skills/ulthon-timer/SKILL.md @@ -1,6 +1,6 @@ ---- +--- name: "ulthon-timer" -description: "内置秒级定时器(php think timer)的使用与扩展规范;用于新增/调整定时任务(site/call、并发分片、TimerController 防刷、timer.mode normal/parallel)。" +description: "内置秒级定时器(php think timer)的使用与扩展规范;用于新增/调整定时任务(site/call、并发分片、TimerController 防刷)。" --- # timer(内置秒级定时器) @@ -21,7 +21,7 @@ description: "内置秒级定时器(php think timer)的使用与扩展规范 - [Timer.php](../../../app/common/command/Timer.php) / [TimerBase.php](../../../extend/base/common/command/TimerBase.php) - [TimerService.php](../../../app/common/service/TimerService.php) / [TimerServiceBase.php](../../../extend/base/common/service/TimerServiceBase.php) - [TimerController.php](../../../app/common/controller/TimerController.php) / [TimerControllerBase.php](../../../extend/base/common/controller/TimerControllerBase.php) - - 运行模式配置:[timer.php](../../../config/timer.php) + - 运行配置:[timer.php](../../../config/timer.php) ## 新增定时任务(默认规则) @@ -182,7 +182,7 @@ return [ - 常规运行:`php think timer` - 只跑一轮(便于验证):`php think timer --temp` -- 无任务时不输出“no request”:`php think timer --quit` +- 无任务时不输出“no request”:`php think timer --quiet` ### 本地调试(指定请求 Host) @@ -192,20 +192,21 @@ site 任务会按站点域名发起请求,默认从 `sysconfig('site','site_do ### 运行模式 -配置在 [timer.php](../../../config/timer.php): +配置在 [timer.php](../../../config/timer.php)。定时器使用 Guzzle CurlMultiHandler 实现非阻塞异步事件循环: -- `normal`:单进程循环 + Guzzle async(默认) -- `parallel`:Workerman 多进程模式(并发更高,相关连接参数在 `timer.php` 中) +- `site` 类型任务通过 curl multi 并行发送 HTTP 请求,真正非阻塞 +- `call` 类型任务在主循环中同步执行 +- `pending` 数组追踪进行中的请求,`handler->tick()` 非阻塞推进 +- 自适应 sleep 策略(50ms/200ms)避免 CPU 空转 ### 配置项说明 | 配置键 | 默认值 | 说明 | |--------|--------|------| -| `mode` | `normal` | 运行模式(`normal` / `parallel`) | -| `max_conn_per_addr` | `128` | 每个域名最多维持的并发连接数(仅多进程模式生效) | -| `keepalive_timeout` | `86400` | 连接不通讯自动关闭时间(秒) | -| `connect_timeout` | `86400` | 连接超时时间(秒) | +| `connect_timeout` | `30` | 连接超时时间(秒) | | `timeout` | `86400` | 请求响应超时时间(秒) | +| `max_handles` | `100` | curl multi 最大并发句柄数 | +| `select_timeout` | `0.001` | curl_multi_select 超时(秒) | | `clear_log_days` | `3` | ClearLog 任务清理 debug_log 表的保留天数,支持从 `.env` 的 `TIMER_CLEAR_LOG_DAYS` 覆盖 | ## 常见坑位(快速自检) diff --git a/config/timer.php b/config/timer.php index 302f4b4..92c64f4 100644 --- a/config/timer.php +++ b/config/timer.php @@ -2,18 +2,18 @@ use think\facade\Env; -// 配置参考:https://doc.ulthon.com/read/augushong/ulthon_admin/timer-mode/zh-cn/2.x.html +// 定时器配置(Guzzle CurlMultiHandler 非阻塞模式) $config = [ - 'mode' => 'normal', + // Guzzle Client 连接配置 + 'connect_timeout' => 30, // 连接超时时间(秒) + 'timeout' => 86400, // 请求响应超时时间(秒) - // 目前仅对多进程模式生效,暂不支持设置为0(不限制) - 'max_conn_per_addr' => 128, // 每个域名最多维持多少并发连接 - 'keepalive_timeout' => 86400, // 连接多长时间不通讯就关闭 - 'connect_timeout' => 86400, // 连接超时时间 - 'timeout' => 86400, // 请求发出后等待响应的超时时间 + // CurlMultiHandler 配置 + 'max_handles' => 100, // curl multi 最大并发句柄数 + 'select_timeout' => 0.001, // curl_multi_select 超时(秒) // 清理日志保留天数(debug_log 表) - 'clear_log_days' => Env::get('timer.clear_log_days', 3), + 'clear_log_days' => Env::get('timer.clear_log_days', 3), ]; return $config; diff --git a/extend/base/common/command/TimerBase.php b/extend/base/common/command/TimerBase.php index 36d6b4b..ca7dd47 100644 --- a/extend/base/common/command/TimerBase.php +++ b/extend/base/common/command/TimerBase.php @@ -16,9 +16,8 @@ use think\facade\Cache; use think\facade\Config; use think\facade\Db; use think\facade\Log; -use Workerman\Http\Client as HttpClient; -use Workerman\Timer; -use Workerman\Worker; +use GuzzleHttp\Handler\CurlMultiHandler; +use GuzzleHttp\HandlerStack; class TimerBase extends Command { @@ -35,7 +34,6 @@ class TimerBase extends Command // 指令配置 $this->setName('timer') ->addOption('temp', null, Option::VALUE_NONE) - ->addOption('quiet', null, Option::VALUE_NONE) ->addOption('local', null, Option::VALUE_NONE) ->addOption('local-host', null, Option::VALUE_OPTIONAL, '本地域名', 'http://localhost') ->addOption('local-port', null, Option::VALUE_OPTIONAL, '本地端口', '8000') @@ -89,13 +87,8 @@ class TimerBase extends Command $this->requestList = $request_list; $this->callList = $call_list; - // 文本模式:正常运行定时器 - $timer_mode = Config::get('timer.mode', 'normal'); - if ($timer_mode == 'normal') { - $this->runNormal(); - } else { - $this->runParallel(); - } + // 运行定时器 + $this->runLoop(); } catch (\Throwable $e) { throw $e; } @@ -207,108 +200,7 @@ class TimerBase extends Command } } - public function runParallel() - { - // 重新构造命令行参数,以便兼容workerman的命令 - global $argv; - $argv = []; - array_unshift($argv, 'think', 'start'); - - $host = $this->host; - $site_host = $this->siteHost; - $output = $this->output; - $input = $this->input; - - $call_list = $this->callList; - - $worker = new Worker(); - $worker->count = 1; - $worker->name = 'timer_request'; - $worker->timerRequestList = $this->requestList; - - $worker->onWorkerStart = function () use ($worker, $host, $site_host, $output, $input) { - $options = [ - 'max_conn_per_addr' => Config::get('timer.max_conn_per_addr', 1000), - 'keepalive_timeout' => Config::get('timer.keepalive_timeout', 86400), - 'connect_timeout' => Config::get('timer.connect_timeout', 86400), - 'timeout' => Config::get('timer.timeout', 86400), - ]; - $http = new HttpClient($options); - - Timer::add(1, function () use ($worker, $host, $site_host, $output, $input, $http) { - $request_list = $worker->timerRequestList; - foreach ($request_list as $request_item) { - $output->writeln(date('Y-m-d H:i:s') . ': build site request async: ' . $request_item['target']); - if (!isset($request_item['is_running'])) { - $request_item['is_running'] = false; - } - - if ($request_item['is_running']) { - $output->writeln('进行中,跳过'); - continue; - } - - if (!isset($request_item['last_run_time'])) { - $request_item['last_run_time'] = 0; - } - - if (time() - $request_item['last_run_time'] < $request_item['frequency']) { - continue; - } - - $request_item['is_running'] = true; - $request_item['last_run_time'] = time(); - - // run_type 调度检查(节流之后、实际执行之前) - if (!$this->shouldExecuteTask($request_item)) { - $request_item['is_running'] = false; - continue; - } - - $http->request($host . $request_item['target'], [ - 'headers' => [ - 'Host' => $site_host, - 'Accept' => 'application/json,text/plain', - ], - 'success' => function ($response) use ($request_item, $output) { - $request_item['is_running'] = false; - $output->writeln((string) $response->getBody()); - }, - 'error' => function ($exception) use ($request_item, $output) { - $request_item['is_running'] = false; - $output->writeln($exception); - }, - ]); - } - }); - }; - - // 运行call任务 - foreach ($call_list as $call_item) { - $worker_call = new Worker(); - $worker_call->count = 1; - $worker_call->name = 'timer_call_' . $call_item['name'] . '_' . $call_item['concurrency_id']; - $worker_call->onWorkerStart = function () use ($worker_call, $call_item, $output, $input) { - Timer::add(1, function () use ($worker_call, $call_item, $output, $input) { - // TODO:统一通过相同的方法判断任务是否该执行 - $cache_key = 'timer_request_' . $call_item['name'] . '_' . $call_item['concurrency_id']; - $cache_tag = 'system_timer'; - $last_exec_time = Cache::get($cache_key, 0); - if ($last_exec_time >= time() - $call_item['frequency']) { - return; - } - Cache::tag($cache_tag)->set($cache_key, time()); - - call_user_func($call_item['target']); - $output->writeln(date('Y-m-d H:i:s') . ': call ' . $call_item['name']); - }); - }; - } - - Worker::runAll(); - } - - public function runNormal() + public function runLoop() { $host = $this->host; $site_host = $this->siteHost; @@ -316,8 +208,17 @@ class TimerBase extends Command $output = $this->output; $input = $this->input; + $handler = new CurlMultiHandler([ + 'select_timeout' => Config::get('timer.select_timeout', 0.001), + 'max_handles' => Config::get('timer.max_handles', 100), + ]); + $stack = HandlerStack::create($handler); + $client = new Client([ + 'handler' => $stack, 'base_uri' => $host, + 'timeout' => Config::get('timer.timeout', 86400), + 'connect_timeout' => Config::get('timer.connect_timeout', 30), 'headers' => [ 'Host' => $site_host, 'Accept' => 'application/json,text/plain', @@ -325,21 +226,28 @@ class TimerBase extends Command 'verify' => false, ]); + $pending = []; + while (true) { - // 运行请求任务 try { - $list_promises = []; - foreach ($request_list as $request_item) { + $has_new_task = false; + + // --- site 任务:非阻塞发火 --- + foreach ($request_list as $request_item) { $name = $request_item['name']; + $key = $name . '_' . $request_item['concurrency_id']; + + // 已在飞 -> 跳过 + if (isset($pending[$key])) { + continue; + } $cache_key = 'timer_request_' . $name . '_' . $request_item['concurrency_id']; $cache_tag = 'system_timer'; $last_exec_time = Cache::get($cache_key, 0); - if ($last_exec_time >= time() - $request_item['frequency']) { continue; } - Cache::tag($cache_tag)->set($cache_key, time()); // run_type 调度检查(Cache 节流之后、实际执行之前) @@ -348,19 +256,34 @@ class TimerBase extends Command } $output->writeln(date('Y-m-d H:i:s') . ': build site request async: ' . $request_item['target']); - $list_promises[$request_item['name']] = $client->getAsync($request_item['target']); + $promise = $client->getAsync($request_item['target']); + $pending[$key] = $promise; + $has_new_task = true; + + $promise->then( + function ($response) use ($key, &$pending, $output) { + unset($pending[$key]); + $output->writeln(date('Y-m-d H:i:s') . ': ' . $key . ' finished'); + }, + function ($reason) use ($key, &$pending, $output) { + unset($pending[$key]); + $output->writeln(date('Y-m-d H:i:s') . ': ' . $key . ' error: ' . $reason->getMessage()); + } + ); } - if (empty($list_promises)) { + // --- 非阻塞推进 curl --- + $handler->tick(); + Utils::queue()->run(); + + // --- 空闲提示 --- + if (empty($pending) && !$has_new_task) { if (!$input->hasOption('quiet')) { $output->writeln(date('Y-m-d H:i:s') . ' no request'); } - } else { - $results = Utils::unwrap($list_promises); - $output->writeln(date('Y-m-d H:i:s') . ': request all finished'); } + } catch (\Throwable $th) { - // throw $th; $output->writeln('error:' . $th->getMessage()); Log::error($th->getMessage()); } @@ -381,10 +304,23 @@ class TimerBase extends Command } if ($input->hasOption('temp')) { + if (!empty($pending)) { + try { + Utils::unwrap($pending); + } catch (\Throwable $th) { + $output->writeln('error:' . $th->getMessage()); + Log::error($th->getMessage()); + } + } break; } - sleep(1); + // --- sleep 策略 --- + if (empty($pending)) { + usleep(200000); + } else { + usleep(50000); + } } } }