refactor(timer): 使用 CurlMultiHandler 替代 Workerman,统一为非阻塞模式

- 删除 runParallel() 方法和所有 Workerman 引用(死代码)
- 重写 runLoop() 为 Guzzle CurlMultiHandler 非阻塞事件循环
- 新增 pending 数组追踪进行中的请求,handler.tick() 非阻塞推进
- 自适应 sleep 策略(有任务 50ms,空闲 200ms)
- 简化 config/timer.php:移除 mode,适配 Guzzle 参数
- 更新 SKILL.md:移除 parallel 描述,修正 --quit 文档 bug
- 验证发现:--quiet 是 ThinkPHP 全局选项,不需要在 configure() 注册
- 验证发现:方法名不能用 run(),与 ThinkPHP Command::run() 签名冲突
This commit is contained in:
augushong
2026-06-02 21:19:53 +08:00
parent c4fbd60bbc
commit 76b23d4c70
3 changed files with 81 additions and 144 deletions

View File

@@ -16,9 +16,8 @@ use think\facade\Cache;
use think\facade\Config;
use think\facade\Db;
use think\facade\Log;
use Workerman\Http\Client as HttpClient;
use Workerman\Timer;
use Workerman\Worker;
use GuzzleHttp\Handler\CurlMultiHandler;
use GuzzleHttp\HandlerStack;
class TimerBase extends Command
{
@@ -35,7 +34,6 @@ class TimerBase extends Command
// 指令配置
$this->setName('timer')
->addOption('temp', null, Option::VALUE_NONE)
->addOption('quiet', null, Option::VALUE_NONE)
->addOption('local', null, Option::VALUE_NONE)
->addOption('local-host', null, Option::VALUE_OPTIONAL, '本地域名', 'http://localhost')
->addOption('local-port', null, Option::VALUE_OPTIONAL, '本地端口', '8000')
@@ -89,13 +87,8 @@ class TimerBase extends Command
$this->requestList = $request_list;
$this->callList = $call_list;
// 文本模式:正常运行定时器
$timer_mode = Config::get('timer.mode', 'normal');
if ($timer_mode == 'normal') {
$this->runNormal();
} else {
$this->runParallel();
}
// 运行定时器
$this->runLoop();
} catch (\Throwable $e) {
throw $e;
}
@@ -207,108 +200,7 @@ class TimerBase extends Command
}
}
public function runParallel()
{
// 重新构造命令行参数,以便兼容workerman的命令
global $argv;
$argv = [];
array_unshift($argv, 'think', 'start');
$host = $this->host;
$site_host = $this->siteHost;
$output = $this->output;
$input = $this->input;
$call_list = $this->callList;
$worker = new Worker();
$worker->count = 1;
$worker->name = 'timer_request';
$worker->timerRequestList = $this->requestList;
$worker->onWorkerStart = function () use ($worker, $host, $site_host, $output, $input) {
$options = [
'max_conn_per_addr' => Config::get('timer.max_conn_per_addr', 1000),
'keepalive_timeout' => Config::get('timer.keepalive_timeout', 86400),
'connect_timeout' => Config::get('timer.connect_timeout', 86400),
'timeout' => Config::get('timer.timeout', 86400),
];
$http = new HttpClient($options);
Timer::add(1, function () use ($worker, $host, $site_host, $output, $input, $http) {
$request_list = $worker->timerRequestList;
foreach ($request_list as $request_item) {
$output->writeln(date('Y-m-d H:i:s') . ': build site request async: ' . $request_item['target']);
if (!isset($request_item['is_running'])) {
$request_item['is_running'] = false;
}
if ($request_item['is_running']) {
$output->writeln('进行中,跳过');
continue;
}
if (!isset($request_item['last_run_time'])) {
$request_item['last_run_time'] = 0;
}
if (time() - $request_item['last_run_time'] < $request_item['frequency']) {
continue;
}
$request_item['is_running'] = true;
$request_item['last_run_time'] = time();
// run_type 调度检查(节流之后、实际执行之前)
if (!$this->shouldExecuteTask($request_item)) {
$request_item['is_running'] = false;
continue;
}
$http->request($host . $request_item['target'], [
'headers' => [
'Host' => $site_host,
'Accept' => 'application/json,text/plain',
],
'success' => function ($response) use ($request_item, $output) {
$request_item['is_running'] = false;
$output->writeln((string) $response->getBody());
},
'error' => function ($exception) use ($request_item, $output) {
$request_item['is_running'] = false;
$output->writeln($exception);
},
]);
}
});
};
// 运行call任务
foreach ($call_list as $call_item) {
$worker_call = new Worker();
$worker_call->count = 1;
$worker_call->name = 'timer_call_' . $call_item['name'] . '_' . $call_item['concurrency_id'];
$worker_call->onWorkerStart = function () use ($worker_call, $call_item, $output, $input) {
Timer::add(1, function () use ($worker_call, $call_item, $output, $input) {
// TODO:统一通过相同的方法判断任务是否该执行
$cache_key = 'timer_request_' . $call_item['name'] . '_' . $call_item['concurrency_id'];
$cache_tag = 'system_timer';
$last_exec_time = Cache::get($cache_key, 0);
if ($last_exec_time >= time() - $call_item['frequency']) {
return;
}
Cache::tag($cache_tag)->set($cache_key, time());
call_user_func($call_item['target']);
$output->writeln(date('Y-m-d H:i:s') . ': call ' . $call_item['name']);
});
};
}
Worker::runAll();
}
public function runNormal()
public function runLoop()
{
$host = $this->host;
$site_host = $this->siteHost;
@@ -316,8 +208,17 @@ class TimerBase extends Command
$output = $this->output;
$input = $this->input;
$handler = new CurlMultiHandler([
'select_timeout' => Config::get('timer.select_timeout', 0.001),
'max_handles' => Config::get('timer.max_handles', 100),
]);
$stack = HandlerStack::create($handler);
$client = new Client([
'handler' => $stack,
'base_uri' => $host,
'timeout' => Config::get('timer.timeout', 86400),
'connect_timeout' => Config::get('timer.connect_timeout', 30),
'headers' => [
'Host' => $site_host,
'Accept' => 'application/json,text/plain',
@@ -325,21 +226,28 @@ class TimerBase extends Command
'verify' => false,
]);
$pending = [];
while (true) {
// 运行请求任务
try {
$list_promises = [];
foreach ($request_list as $request_item) {
$has_new_task = false;
// --- site 任务:非阻塞发火 ---
foreach ($request_list as $request_item) {
$name = $request_item['name'];
$key = $name . '_' . $request_item['concurrency_id'];
// 已在飞 -> 跳过
if (isset($pending[$key])) {
continue;
}
$cache_key = 'timer_request_' . $name . '_' . $request_item['concurrency_id'];
$cache_tag = 'system_timer';
$last_exec_time = Cache::get($cache_key, 0);
if ($last_exec_time >= time() - $request_item['frequency']) {
continue;
}
Cache::tag($cache_tag)->set($cache_key, time());
// run_type 调度检查Cache 节流之后、实际执行之前)
@@ -348,19 +256,34 @@ class TimerBase extends Command
}
$output->writeln(date('Y-m-d H:i:s') . ': build site request async: ' . $request_item['target']);
$list_promises[$request_item['name']] = $client->getAsync($request_item['target']);
$promise = $client->getAsync($request_item['target']);
$pending[$key] = $promise;
$has_new_task = true;
$promise->then(
function ($response) use ($key, &$pending, $output) {
unset($pending[$key]);
$output->writeln(date('Y-m-d H:i:s') . ': ' . $key . ' finished');
},
function ($reason) use ($key, &$pending, $output) {
unset($pending[$key]);
$output->writeln(date('Y-m-d H:i:s') . ': ' . $key . ' error: ' . $reason->getMessage());
}
);
}
if (empty($list_promises)) {
// --- 非阻塞推进 curl ---
$handler->tick();
Utils::queue()->run();
// --- 空闲提示 ---
if (empty($pending) && !$has_new_task) {
if (!$input->hasOption('quiet')) {
$output->writeln(date('Y-m-d H:i:s') . ' no request');
}
} else {
$results = Utils::unwrap($list_promises);
$output->writeln(date('Y-m-d H:i:s') . ': request all finished');
}
} catch (\Throwable $th) {
// throw $th;
$output->writeln('error:' . $th->getMessage());
Log::error($th->getMessage());
}
@@ -381,10 +304,23 @@ class TimerBase extends Command
}
if ($input->hasOption('temp')) {
if (!empty($pending)) {
try {
Utils::unwrap($pending);
} catch (\Throwable $th) {
$output->writeln('error:' . $th->getMessage());
Log::error($th->getMessage());
}
}
break;
}
sleep(1);
// --- sleep 策略 ---
if (empty($pending)) {
usleep(200000);
} else {
usleep(50000);
}
}
}
}