mirror of
https://gitee.com/ulthon/ulthon_admin.git
synced 2026-07-01 15:32:48 +08:00
269 lines
8.9 KiB
PHP
269 lines
8.9 KiB
PHP
<?php
|
||
|
||
declare(strict_types=1);
|
||
|
||
namespace base\common\command;
|
||
|
||
use app\common\model\VirtualModel;
|
||
use GuzzleHttp\Client;
|
||
use GuzzleHttp\Promise\Utils;
|
||
use think\console\Command;
|
||
use think\console\Input;
|
||
use think\console\input\Option;
|
||
use think\console\Output;
|
||
use think\facade\Cache;
|
||
use think\facade\Config;
|
||
use think\facade\Log;
|
||
use Workerman\Http\Client as HttpClient;
|
||
use Workerman\Timer;
|
||
use Workerman\Worker;
|
||
|
||
class TimerBase extends Command
|
||
{
|
||
protected $host;
|
||
protected $siteDomain;
|
||
protected $siteHost;
|
||
protected $requestList;
|
||
|
||
protected function configure()
|
||
{
|
||
// 指令配置
|
||
$this->setName('timer')
|
||
->addOption('temp', null, Option::VALUE_NONE)
|
||
->addOption('quit', null, Option::VALUE_NONE)
|
||
->addOption('local', null, Option::VALUE_NONE)
|
||
->addOption('local-host', null, Option::VALUE_OPTIONAL, '本地域名', 'http://localhost')
|
||
->addOption('local-port', null, Option::VALUE_OPTIONAL, '本地端口', '80')
|
||
->setDescription('内置秒级定时器');
|
||
}
|
||
|
||
protected function execute(Input $input, Output $output)
|
||
{
|
||
// 指令输出
|
||
$output->writeln('start timer');
|
||
|
||
$site_domain = sysconfig('site', 'site_domain');
|
||
if (empty($site_domain)) {
|
||
$output->writeln('请前往后台设置站点域名(site_domain)配置项');
|
||
|
||
return;
|
||
}
|
||
|
||
$host = $site_domain;
|
||
|
||
if ($input->hasOption('local')) {
|
||
$host = $input->getOption('local-host') . ':' . $input->getOption('local-port');
|
||
}
|
||
|
||
$output->writeln('站点域名:' . $host);
|
||
$site_host = parse_url($host, PHP_URL_HOST);
|
||
|
||
$config_list = include app_file_path('common/command/timer/config.php');
|
||
|
||
$request_list = [];
|
||
foreach ($config_list as $config_item) {
|
||
$config_item = static::initConfigItem($config_item);
|
||
|
||
if ($config_item['name'] == 'http_demo' && !env('adminsystem.is_demo', false)) {
|
||
continue;
|
||
}
|
||
|
||
$concurrency = $config_item['concurrency'];
|
||
|
||
for ($i = 0; $i < $concurrency; $i++) {
|
||
$target = $config_item['target'];
|
||
$params = [
|
||
'concurrency_id' => $i,
|
||
'concurrency_count' => $concurrency,
|
||
];
|
||
$target_info = parse_url($target);
|
||
$query_params = [];
|
||
if (isset($target_info['query'])) {
|
||
parse_str($target_info['query'], $query_params);
|
||
}
|
||
$query_params = array_merge($query_params, $params);
|
||
$target_info['query'] = http_build_query($query_params);
|
||
$target = unparse_url($target_info);
|
||
|
||
$config_item['target'] = $target;
|
||
$config_item['concurrency_id'] = $i;
|
||
|
||
$request_list[] = clone $config_item;
|
||
}
|
||
}
|
||
|
||
$this->host = $host;
|
||
$this->siteDomain = $site_domain;
|
||
$this->siteHost = $site_host;
|
||
$this->requestList = $request_list;
|
||
|
||
$timer_mode = Config::get('timer.mode', 'normal');
|
||
if ($timer_mode == 'normal') {
|
||
$this->runNormal();
|
||
} else {
|
||
$this->runParallel();
|
||
}
|
||
}
|
||
|
||
public function runParallel()
|
||
{
|
||
// 重新构造命令行参数,以便兼容workerman的命令
|
||
global $argv;
|
||
$argv = [];
|
||
array_unshift($argv, 'think', 'start');
|
||
|
||
$host = $this->host;
|
||
$site_host = $this->siteHost;
|
||
$output = $this->output;
|
||
$input = $this->input;
|
||
|
||
$worker = new Worker();
|
||
$worker->count = 1;
|
||
$worker->name = 'start_timer_parallel';
|
||
$worker->timerRequestList = $this->requestList;
|
||
|
||
$worker->onWorkerStart = function () use ($worker, $host, $site_host, $output, $input) {
|
||
$options = [
|
||
'max_conn_per_addr' => Config::get('timer.max_conn_per_addr', 1000),
|
||
'keepalive_timeout' => Config::get('timer.keepalive_timeout', 86400),
|
||
'connect_timeout' => Config::get('timer.connect_timeout', 86400),
|
||
'timeout' => Config::get('timer.timeout', 86400),
|
||
];
|
||
$http = new HttpClient($options);
|
||
|
||
Timer::add(1, function () use ($worker, $host, $site_host, $output, $input, $http) {
|
||
$request_list = $worker->timerRequestList;
|
||
foreach ($request_list as $request_item) {
|
||
$output->writeln(date('Y-m-d H:i:s') . ': build site request async: ' . $request_item['target']);
|
||
if (!isset($request_item['is_running'])) {
|
||
$request_item['is_running'] = false;
|
||
}
|
||
|
||
if ($request_item['is_running']) {
|
||
$output->writeln('进行中,跳过');
|
||
continue;
|
||
}
|
||
|
||
if (!isset($request_item['last_run_time'])) {
|
||
$request_item['last_run_time'] = 0;
|
||
}
|
||
|
||
if (time() - $request_item['last_run_time'] < $request_item['frequency']) {
|
||
continue;
|
||
}
|
||
|
||
$request_item['is_running'] = true;
|
||
$request_item['last_run_time'] = time();
|
||
|
||
//
|
||
|
||
$http->request($host . $request_item['target'], [
|
||
'headers' => [
|
||
'Host' => $site_host,
|
||
'Accept' => 'application/json,text/plain',
|
||
],
|
||
'success' => function ($response) use ($request_item, $output) {
|
||
$request_item['is_running'] = false;
|
||
$output->writeln((string) $response->getBody());
|
||
},
|
||
'error' => function ($exception) use ($request_item, $output) {
|
||
$request_item['is_running'] = false;
|
||
$output->writeln($exception);
|
||
},
|
||
]);
|
||
}
|
||
});
|
||
};
|
||
|
||
Worker::runAll();
|
||
}
|
||
|
||
public function runNormal()
|
||
{
|
||
$host = $this->host;
|
||
$site_host = $this->siteHost;
|
||
$request_list = $this->requestList;
|
||
$output = $this->output;
|
||
$input = $this->input;
|
||
|
||
$client = new Client([
|
||
'base_uri' => $host,
|
||
'headers' => [
|
||
'Host' => $site_host,
|
||
'Accept' => 'application/json,text/plain',
|
||
],
|
||
'verify' => false,
|
||
]);
|
||
|
||
while (true) {
|
||
try {
|
||
$list_promises = [];
|
||
foreach ($request_list as $request_item) {
|
||
$name = $request_item['name'];
|
||
|
||
$cache_key = 'timer_' . $name . '_' . $request_item['concurrency_id'];
|
||
$cache_tag = 'system_timer';
|
||
$last_exec_time = Cache::get($cache_key, 0);
|
||
|
||
if ($last_exec_time >= time() - $request_item['frequency']) {
|
||
continue;
|
||
}
|
||
|
||
Cache::tag($cache_tag)->set($cache_key, time());
|
||
$type = $request_item['type'];
|
||
switch ($type) {
|
||
case 'site':
|
||
$output->writeln(date('Y-m-d H:i:s') . ': build site request async: ' . $request_item['target']);
|
||
$list_promises[$request_item['name']] = $client->getAsync($request_item['target']);
|
||
|
||
break;
|
||
|
||
default:
|
||
$output->writeln(date('Y-m-d H:i:s') . 'unsupport type:' . $type);
|
||
break;
|
||
}
|
||
}
|
||
|
||
if (empty($list_promises)) {
|
||
if (!$input->hasOption('quit')) {
|
||
$output->writeln(date('Y-m-d H:i:s') . ' no request');
|
||
}
|
||
} else {
|
||
$results = Utils::unwrap($list_promises);
|
||
$output->writeln(date('Y-m-d H:i:s') . ': request all finished');
|
||
}
|
||
} catch (\Throwable $th) {
|
||
// throw $th;
|
||
$output->writeln('error:' . $th->getMessage());
|
||
Log::error($th->getMessage());
|
||
}
|
||
|
||
if ($input->hasOption('temp')) {
|
||
break;
|
||
}
|
||
|
||
sleep(1);
|
||
}
|
||
}
|
||
|
||
private static function initConfigItem($config)
|
||
{
|
||
$default = [
|
||
'name' => 'http_demo',
|
||
'type' => 'site',
|
||
'target' => '',
|
||
'frequency' => 600,
|
||
'concurrency' => 1,
|
||
];
|
||
|
||
$data = array_merge($default, $config);
|
||
|
||
if ($data['frequency'] < 0) {
|
||
$data['frequency'] = 0;
|
||
}
|
||
$model_timer = new VirtualModel($data);
|
||
|
||
return $model_timer;
|
||
}
|
||
}
|