Files
ulthon_admin/extend/base/common/command/TimerBase.php
augushong 76b23d4c70 refactor(timer): 使用 CurlMultiHandler 替代 Workerman,统一为非阻塞模式
- 删除 runParallel() 方法和所有 Workerman 引用(死代码)
- 重写 runLoop() 为 Guzzle CurlMultiHandler 非阻塞事件循环
- 新增 pending 数组追踪进行中的请求,handler.tick() 非阻塞推进
- 自适应 sleep 策略(有任务 50ms,空闲 200ms)
- 简化 config/timer.php:移除 mode,适配 Guzzle 参数
- 更新 SKILL.md:移除 parallel 描述,修正 --quit 文档 bug
- 验证发现:--quiet 是 ThinkPHP 全局选项,不需要在 configure() 注册
- 验证发现:方法名不能用 run(),与 ThinkPHP Command::run() 签名冲突
2026-06-02 21:19:53 +08:00

327 lines
12 KiB
PHP
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<?php
declare(strict_types=1);
namespace base\common\command;
use app\common\console\Command;
use app\common\service\HostService;
use app\common\service\TimerService;
use GuzzleHttp\Client;
use GuzzleHttp\Promise\Utils;
use think\console\Input;
use think\console\input\Option;
use think\console\Output;
use think\facade\Cache;
use think\facade\Config;
use think\facade\Db;
use think\facade\Log;
use GuzzleHttp\Handler\CurlMultiHandler;
use GuzzleHttp\HandlerStack;
class TimerBase extends Command
{
protected $host;
protected $siteDomain;
protected $siteHost;
protected $requestList;
protected $callList;
protected function configure()
{
parent::configure();
// 指令配置
$this->setName('timer')
->addOption('temp', null, Option::VALUE_NONE)
->addOption('local', null, Option::VALUE_NONE)
->addOption('local-host', null, Option::VALUE_OPTIONAL, '本地域名', 'http://localhost')
->addOption('local-port', null, Option::VALUE_OPTIONAL, '本地端口', '8000')
->setDescription('内置秒级定时器');
}
protected function execute(Input $input, Output $output)
{
try {
// 指令输出
$output->writeln('start timer');
$site_domain = sysconfig('site', 'site_domain');
if (empty($site_domain)) {
$output->writeln('请前往后台设置站点域名site_domain配置项');
return;
}
$host = $site_domain;
if ($input->hasOption('local')) {
$host = $input->getOption('local-host') . ':' . $input->getOption('local-port');
}
$output->writeln('站点域名:' . $host);
$site_host = parse_url($host, PHP_URL_HOST);
// 同步配置到数据库
TimerService::syncConfigToDatabase();
// 设置配置的任务
$timer_service = new TimerService();
$request_list = $timer_service->generateAllRequestList();
$call_list = $timer_service->generateAllCallList();
// 内置的节点注册任务
$system_host_register =
[
'name' => 'system_host_register', // 定时任务的名称,不能重复
'type' => 'call', // 定时任务的类型默认只支持site你也可以重写定时器命令行以支持其他命令
'target' => [HostService::class, 'heartbeat'], // 要访问的地址如果不是以https开头那么以后台的系统配置中读取相关配置如果没有配置则不执行
'frequency' => 30, // 执行频率单位填写10则每10秒过后执行一次
];
$system_host_call_list = TimerService::generateTaskInstanceFromConfig($system_host_register);
$call_list = array_merge($call_list, $system_host_call_list);
$this->host = $host;
$this->siteDomain = $site_domain;
$this->siteHost = $site_host;
$this->requestList = $request_list;
$this->callList = $call_list;
// 运行定时器
$this->runLoop();
} catch (\Throwable $e) {
throw $e;
}
}
/**
* 基于 run_type 判断当前节点是否应执行该任务(仅对 site 类型生效).
* 在 Cache 节流之后、实际执行之前调用。
*
* @param array|\ArrayAccess $task_item 任务实例(含 name/type/frequency 等)
* @return bool true=应执行 false=跳过
*/
protected function shouldExecuteTask($task_item): bool
{
// call 类型任务不受 run_type 调度层影响
if (($task_item['type'] ?? '') === 'call') {
return true;
}
$task_name = $task_item['name'] ?? '';
try {
$config = Db::name('system_timer_config')
->where('task_name', $task_name)
->find();
} catch (\Throwable $e) {
Log::warning('shouldExecuteTask: 查询 system_timer_config 失败 - ' . $e->getMessage());
return true;
}
// 无数据库记录时不阻塞(回退到默认行为)
if (empty($config)) {
return true;
}
// status=0 表示任务已禁用
if (isset($config['status']) && (int) $config['status'] === 0) {
return false;
}
$run_type = $config['run_type'] ?? 'auto';
switch ($run_type) {
case 'main':
// 仅主节点执行
$master_node = HostService::getMasterNode();
$node_id = HostService::getNodeId();
return ($master_node === $node_id);
case 'auto':
// 两阶段 DB 行锁竞争:竞争成功后释放锁,再执行任务
Db::startTrans();
try {
$row = Db::name('system_timer_config')
->where('task_name', $task_name)
->lock(true)
->find();
if ($row && !empty($row['last_execute_time']) && $row['last_execute_time'] > 0) {
if ((time() - (int) $row['last_execute_time']) < ($task_item['frequency'] ?? 0)) {
Db::commit();
return false; // 另一个节点刚执行过
}
}
Db::name('system_timer_config')
->where('task_name', $task_name)
->update([
'last_execute_node' => HostService::getNodeId(),
'last_execute_time' => time(),
'update_time' => time(),
]);
Db::commit();
return true;
} catch (\Throwable $e) {
Db::rollback();
Log::warning('shouldExecuteTask auto mode exception: ' . $e->getMessage());
return false;
}
case 'all':
// 所有节点都执行
return true;
case 'manual':
// 手动触发:检查 manual_trigger=1执行一次后重置为 0
if (!empty($config['manual_trigger'])) {
Db::name('system_timer_config')
->where('task_name', $task_name)
->update([
'manual_trigger' => 0,
'update_time' => time(),
]);
return true;
}
return false;
default:
// 未知 run_type 不阻塞
return true;
}
}
public function runLoop()
{
$host = $this->host;
$site_host = $this->siteHost;
$request_list = $this->requestList;
$output = $this->output;
$input = $this->input;
$handler = new CurlMultiHandler([
'select_timeout' => Config::get('timer.select_timeout', 0.001),
'max_handles' => Config::get('timer.max_handles', 100),
]);
$stack = HandlerStack::create($handler);
$client = new Client([
'handler' => $stack,
'base_uri' => $host,
'timeout' => Config::get('timer.timeout', 86400),
'connect_timeout' => Config::get('timer.connect_timeout', 30),
'headers' => [
'Host' => $site_host,
'Accept' => 'application/json,text/plain',
],
'verify' => false,
]);
$pending = [];
while (true) {
try {
$has_new_task = false;
// --- site 任务:非阻塞发火 ---
foreach ($request_list as $request_item) {
$name = $request_item['name'];
$key = $name . '_' . $request_item['concurrency_id'];
// 已在飞 -> 跳过
if (isset($pending[$key])) {
continue;
}
$cache_key = 'timer_request_' . $name . '_' . $request_item['concurrency_id'];
$cache_tag = 'system_timer';
$last_exec_time = Cache::get($cache_key, 0);
if ($last_exec_time >= time() - $request_item['frequency']) {
continue;
}
Cache::tag($cache_tag)->set($cache_key, time());
// run_type 调度检查Cache 节流之后、实际执行之前)
if (!$this->shouldExecuteTask($request_item)) {
continue;
}
$output->writeln(date('Y-m-d H:i:s') . ': build site request async: ' . $request_item['target']);
$promise = $client->getAsync($request_item['target']);
$pending[$key] = $promise;
$has_new_task = true;
$promise->then(
function ($response) use ($key, &$pending, $output) {
unset($pending[$key]);
$output->writeln(date('Y-m-d H:i:s') . ': ' . $key . ' finished');
},
function ($reason) use ($key, &$pending, $output) {
unset($pending[$key]);
$output->writeln(date('Y-m-d H:i:s') . ': ' . $key . ' error: ' . $reason->getMessage());
}
);
}
// --- 非阻塞推进 curl ---
$handler->tick();
Utils::queue()->run();
// --- 空闲提示 ---
if (empty($pending) && !$has_new_task) {
if (!$input->hasOption('quiet')) {
$output->writeln(date('Y-m-d H:i:s') . ' no request');
}
}
} catch (\Throwable $th) {
$output->writeln('error:' . $th->getMessage());
Log::error($th->getMessage());
}
// 运行call任务
$call_list = $this->callList;
foreach ($call_list as $call_item) {
$name = $call_item['name'];
$cache_key = 'timer_call_' . $name . '_' . $call_item['concurrency_id'];
$cache_tag = 'call_timer';
$last_exec_time = Cache::get($cache_key, 0);
if ($last_exec_time >= time() - $call_item['frequency']) {
continue;
}
Cache::tag($cache_tag)->set($cache_key, time());
call_user_func($call_item['target']);
$output->writeln(date('Y-m-d H:i:s') . ': call function ' . $name . ' finished');
}
if ($input->hasOption('temp')) {
if (!empty($pending)) {
try {
Utils::unwrap($pending);
} catch (\Throwable $th) {
$output->writeln('error:' . $th->getMessage());
Log::error($th->getMessage());
}
}
break;
}
// --- sleep 策略 ---
if (empty($pending)) {
usleep(200000);
} else {
usleep(50000);
}
}
}
}