mirror of
https://gitee.com/ulthon/ulthon_admin.git
synced 2026-07-01 15:32:48 +08:00
- 删除 runParallel() 方法和所有 Workerman 引用(死代码) - 重写 runLoop() 为 Guzzle CurlMultiHandler 非阻塞事件循环 - 新增 pending 数组追踪进行中的请求,handler.tick() 非阻塞推进 - 自适应 sleep 策略(有任务 50ms,空闲 200ms) - 简化 config/timer.php:移除 mode,适配 Guzzle 参数 - 更新 SKILL.md:移除 parallel 描述,修正 --quit 文档 bug - 验证发现:--quiet 是 ThinkPHP 全局选项,不需要在 configure() 注册 - 验证发现:方法名不能用 run(),与 ThinkPHP Command::run() 签名冲突
327 lines
12 KiB
PHP
327 lines
12 KiB
PHP
<?php
|
||
|
||
declare(strict_types=1);
|
||
|
||
namespace base\common\command;
|
||
|
||
use app\common\console\Command;
|
||
use app\common\service\HostService;
|
||
use app\common\service\TimerService;
|
||
use GuzzleHttp\Client;
|
||
use GuzzleHttp\Promise\Utils;
|
||
use think\console\Input;
|
||
use think\console\input\Option;
|
||
use think\console\Output;
|
||
use think\facade\Cache;
|
||
use think\facade\Config;
|
||
use think\facade\Db;
|
||
use think\facade\Log;
|
||
use GuzzleHttp\Handler\CurlMultiHandler;
|
||
use GuzzleHttp\HandlerStack;
|
||
|
||
class TimerBase extends Command
|
||
{
|
||
protected $host;
|
||
protected $siteDomain;
|
||
protected $siteHost;
|
||
protected $requestList;
|
||
protected $callList;
|
||
|
||
protected function configure()
|
||
{
|
||
parent::configure();
|
||
|
||
// 指令配置
|
||
$this->setName('timer')
|
||
->addOption('temp', null, Option::VALUE_NONE)
|
||
->addOption('local', null, Option::VALUE_NONE)
|
||
->addOption('local-host', null, Option::VALUE_OPTIONAL, '本地域名', 'http://localhost')
|
||
->addOption('local-port', null, Option::VALUE_OPTIONAL, '本地端口', '8000')
|
||
->setDescription('内置秒级定时器');
|
||
}
|
||
|
||
protected function execute(Input $input, Output $output)
|
||
{
|
||
try {
|
||
// 指令输出
|
||
$output->writeln('start timer');
|
||
|
||
$site_domain = sysconfig('site', 'site_domain');
|
||
if (empty($site_domain)) {
|
||
$output->writeln('请前往后台设置站点域名(site_domain)配置项');
|
||
|
||
return;
|
||
}
|
||
|
||
$host = $site_domain;
|
||
|
||
if ($input->hasOption('local')) {
|
||
$host = $input->getOption('local-host') . ':' . $input->getOption('local-port');
|
||
}
|
||
|
||
$output->writeln('站点域名:' . $host);
|
||
$site_host = parse_url($host, PHP_URL_HOST);
|
||
|
||
// 同步配置到数据库
|
||
TimerService::syncConfigToDatabase();
|
||
|
||
// 设置配置的任务
|
||
$timer_service = new TimerService();
|
||
$request_list = $timer_service->generateAllRequestList();
|
||
$call_list = $timer_service->generateAllCallList();
|
||
|
||
// 内置的节点注册任务
|
||
$system_host_register =
|
||
[
|
||
'name' => 'system_host_register', // 定时任务的名称,不能重复
|
||
'type' => 'call', // 定时任务的类型,默认只支持site,你也可以重写定时器命令行以支持其他命令
|
||
'target' => [HostService::class, 'heartbeat'], // 要访问的地址,如果不是以https开头,那么以后台的系统配置中读取相关配置,如果没有配置则不执行
|
||
'frequency' => 30, // 执行频率,单位:秒,填写10,则每10秒过后执行一次
|
||
];
|
||
$system_host_call_list = TimerService::generateTaskInstanceFromConfig($system_host_register);
|
||
$call_list = array_merge($call_list, $system_host_call_list);
|
||
|
||
$this->host = $host;
|
||
$this->siteDomain = $site_domain;
|
||
$this->siteHost = $site_host;
|
||
$this->requestList = $request_list;
|
||
$this->callList = $call_list;
|
||
|
||
// 运行定时器
|
||
$this->runLoop();
|
||
} catch (\Throwable $e) {
|
||
throw $e;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 基于 run_type 判断当前节点是否应执行该任务(仅对 site 类型生效).
|
||
* 在 Cache 节流之后、实际执行之前调用。
|
||
*
|
||
* @param array|\ArrayAccess $task_item 任务实例(含 name/type/frequency 等)
|
||
* @return bool true=应执行 false=跳过
|
||
*/
|
||
protected function shouldExecuteTask($task_item): bool
|
||
{
|
||
// call 类型任务不受 run_type 调度层影响
|
||
if (($task_item['type'] ?? '') === 'call') {
|
||
return true;
|
||
}
|
||
|
||
$task_name = $task_item['name'] ?? '';
|
||
|
||
try {
|
||
$config = Db::name('system_timer_config')
|
||
->where('task_name', $task_name)
|
||
->find();
|
||
} catch (\Throwable $e) {
|
||
Log::warning('shouldExecuteTask: 查询 system_timer_config 失败 - ' . $e->getMessage());
|
||
|
||
return true;
|
||
}
|
||
|
||
// 无数据库记录时不阻塞(回退到默认行为)
|
||
if (empty($config)) {
|
||
return true;
|
||
}
|
||
|
||
// status=0 表示任务已禁用
|
||
if (isset($config['status']) && (int) $config['status'] === 0) {
|
||
return false;
|
||
}
|
||
|
||
$run_type = $config['run_type'] ?? 'auto';
|
||
|
||
switch ($run_type) {
|
||
case 'main':
|
||
// 仅主节点执行
|
||
$master_node = HostService::getMasterNode();
|
||
$node_id = HostService::getNodeId();
|
||
|
||
return ($master_node === $node_id);
|
||
|
||
case 'auto':
|
||
// 两阶段 DB 行锁竞争:竞争成功后释放锁,再执行任务
|
||
Db::startTrans();
|
||
try {
|
||
$row = Db::name('system_timer_config')
|
||
->where('task_name', $task_name)
|
||
->lock(true)
|
||
->find();
|
||
|
||
if ($row && !empty($row['last_execute_time']) && $row['last_execute_time'] > 0) {
|
||
if ((time() - (int) $row['last_execute_time']) < ($task_item['frequency'] ?? 0)) {
|
||
Db::commit();
|
||
|
||
return false; // 另一个节点刚执行过
|
||
}
|
||
}
|
||
|
||
Db::name('system_timer_config')
|
||
->where('task_name', $task_name)
|
||
->update([
|
||
'last_execute_node' => HostService::getNodeId(),
|
||
'last_execute_time' => time(),
|
||
'update_time' => time(),
|
||
]);
|
||
|
||
Db::commit();
|
||
|
||
return true;
|
||
} catch (\Throwable $e) {
|
||
Db::rollback();
|
||
Log::warning('shouldExecuteTask auto mode exception: ' . $e->getMessage());
|
||
|
||
return false;
|
||
}
|
||
|
||
case 'all':
|
||
// 所有节点都执行
|
||
return true;
|
||
|
||
case 'manual':
|
||
// 手动触发:检查 manual_trigger=1,执行一次后重置为 0
|
||
if (!empty($config['manual_trigger'])) {
|
||
Db::name('system_timer_config')
|
||
->where('task_name', $task_name)
|
||
->update([
|
||
'manual_trigger' => 0,
|
||
'update_time' => time(),
|
||
]);
|
||
|
||
return true;
|
||
}
|
||
|
||
return false;
|
||
|
||
default:
|
||
// 未知 run_type 不阻塞
|
||
return true;
|
||
}
|
||
}
|
||
|
||
public function runLoop()
|
||
{
|
||
$host = $this->host;
|
||
$site_host = $this->siteHost;
|
||
$request_list = $this->requestList;
|
||
$output = $this->output;
|
||
$input = $this->input;
|
||
|
||
$handler = new CurlMultiHandler([
|
||
'select_timeout' => Config::get('timer.select_timeout', 0.001),
|
||
'max_handles' => Config::get('timer.max_handles', 100),
|
||
]);
|
||
$stack = HandlerStack::create($handler);
|
||
|
||
$client = new Client([
|
||
'handler' => $stack,
|
||
'base_uri' => $host,
|
||
'timeout' => Config::get('timer.timeout', 86400),
|
||
'connect_timeout' => Config::get('timer.connect_timeout', 30),
|
||
'headers' => [
|
||
'Host' => $site_host,
|
||
'Accept' => 'application/json,text/plain',
|
||
],
|
||
'verify' => false,
|
||
]);
|
||
|
||
$pending = [];
|
||
|
||
while (true) {
|
||
try {
|
||
$has_new_task = false;
|
||
|
||
// --- site 任务:非阻塞发火 ---
|
||
foreach ($request_list as $request_item) {
|
||
$name = $request_item['name'];
|
||
$key = $name . '_' . $request_item['concurrency_id'];
|
||
|
||
// 已在飞 -> 跳过
|
||
if (isset($pending[$key])) {
|
||
continue;
|
||
}
|
||
|
||
$cache_key = 'timer_request_' . $name . '_' . $request_item['concurrency_id'];
|
||
$cache_tag = 'system_timer';
|
||
$last_exec_time = Cache::get($cache_key, 0);
|
||
if ($last_exec_time >= time() - $request_item['frequency']) {
|
||
continue;
|
||
}
|
||
Cache::tag($cache_tag)->set($cache_key, time());
|
||
|
||
// run_type 调度检查(Cache 节流之后、实际执行之前)
|
||
if (!$this->shouldExecuteTask($request_item)) {
|
||
continue;
|
||
}
|
||
|
||
$output->writeln(date('Y-m-d H:i:s') . ': build site request async: ' . $request_item['target']);
|
||
$promise = $client->getAsync($request_item['target']);
|
||
$pending[$key] = $promise;
|
||
$has_new_task = true;
|
||
|
||
$promise->then(
|
||
function ($response) use ($key, &$pending, $output) {
|
||
unset($pending[$key]);
|
||
$output->writeln(date('Y-m-d H:i:s') . ': ' . $key . ' finished');
|
||
},
|
||
function ($reason) use ($key, &$pending, $output) {
|
||
unset($pending[$key]);
|
||
$output->writeln(date('Y-m-d H:i:s') . ': ' . $key . ' error: ' . $reason->getMessage());
|
||
}
|
||
);
|
||
}
|
||
|
||
// --- 非阻塞推进 curl ---
|
||
$handler->tick();
|
||
Utils::queue()->run();
|
||
|
||
// --- 空闲提示 ---
|
||
if (empty($pending) && !$has_new_task) {
|
||
if (!$input->hasOption('quiet')) {
|
||
$output->writeln(date('Y-m-d H:i:s') . ' no request');
|
||
}
|
||
}
|
||
|
||
} catch (\Throwable $th) {
|
||
$output->writeln('error:' . $th->getMessage());
|
||
Log::error($th->getMessage());
|
||
}
|
||
|
||
// 运行call任务
|
||
$call_list = $this->callList;
|
||
foreach ($call_list as $call_item) {
|
||
$name = $call_item['name'];
|
||
$cache_key = 'timer_call_' . $name . '_' . $call_item['concurrency_id'];
|
||
$cache_tag = 'call_timer';
|
||
$last_exec_time = Cache::get($cache_key, 0);
|
||
if ($last_exec_time >= time() - $call_item['frequency']) {
|
||
continue;
|
||
}
|
||
Cache::tag($cache_tag)->set($cache_key, time());
|
||
call_user_func($call_item['target']);
|
||
$output->writeln(date('Y-m-d H:i:s') . ': call function ' . $name . ' finished');
|
||
}
|
||
|
||
if ($input->hasOption('temp')) {
|
||
if (!empty($pending)) {
|
||
try {
|
||
Utils::unwrap($pending);
|
||
} catch (\Throwable $th) {
|
||
$output->writeln('error:' . $th->getMessage());
|
||
Log::error($th->getMessage());
|
||
}
|
||
}
|
||
break;
|
||
}
|
||
|
||
// --- sleep 策略 ---
|
||
if (empty($pending)) {
|
||
usleep(200000);
|
||
} else {
|
||
usleep(50000);
|
||
}
|
||
}
|
||
}
|
||
}
|