新增通用的虚拟数据模型;新增定时任务多进程;新增多进程阻塞模式的定时器;

This commit is contained in:
augushong
2024-10-04 23:03:18 +08:00
parent ee80d6eee5
commit e53445e41c
6 changed files with 198 additions and 22 deletions

View File

@@ -4,6 +4,7 @@ declare(strict_types=1);
namespace base\common\command;
use app\common\model\VirtualModel;
use GuzzleHttp\Client;
use GuzzleHttp\Promise\Utils;
use think\console\Command;
@@ -11,10 +12,19 @@ use think\console\Input;
use think\console\input\Option;
use think\console\Output;
use think\facade\Cache;
use think\facade\Config;
use think\facade\Log;
use Workerman\Http\Client as HttpClient;
use Workerman\Timer;
use Workerman\Worker;
class TimerBase extends Command
{
protected $host;
protected $siteDomain;
protected $siteHost;
protected $requestList;
protected function configure()
{
// 指令配置
@@ -22,7 +32,7 @@ class TimerBase extends Command
->addOption('temp', null, Option::VALUE_NONE)
->addOption('quit', null, Option::VALUE_NONE)
->addOption('local', null, Option::VALUE_NONE)
->addOption('local-host', null, Option::VALUE_OPTIONAL, '本地域名','http://localhost')
->addOption('local-host', null, Option::VALUE_OPTIONAL, '本地域名', 'http://localhost')
->addOption('local-port', null, Option::VALUE_OPTIONAL, '本地端口', '80')
->setDescription('内置秒级定时器');
}
@@ -38,6 +48,7 @@ class TimerBase extends Command
return;
}
$site_host = parse_url($site_domain, PHP_URL_HOST);
$output->writeln('站点域名:' . $site_domain);
$host = $site_domain;
@@ -45,41 +56,154 @@ class TimerBase extends Command
$host = $input->getOption('local-host') . ':' . $input->getOption('local-port');
}
$config_list = include app_file_path('common/command/timer/config.php');
$request_list = [];
foreach ($config_list as $config_item) {
$config_item = static::initConfigItem($config_item);
if ($config_item['name'] == 'http_demo' && !env('adminsystem.is_demo', false)) {
continue;
}
$concurrency = $config_item['concurrency'];
for ($i = 0; $i < $concurrency; $i++) {
$target = $config_item['target'];
$params = [
'concurrency_id' => $i,
'concurrency_count' => $concurrency,
];
$target_info = parse_url($target);
$query_params = [];
if (isset($target_info['query'])) {
parse_str($target_info['query'], $query_params);
}
$query_params = array_merge($query_params, $params);
$target_info['query'] = http_build_query($query_params);
$target = unparse_url($target_info);
$config_item['target'] = $target;
$config_item['concurrency_id'] = $i;
$request_list[] = clone $config_item;
}
}
$this->host = $host;
$this->siteDomain = $site_domain;
$this->siteHost = $site_host;
$this->requestList = $request_list;
$timer_mode = Config::get('timer.mode', 'normal');
if ($timer_mode == 'normal') {
$this->runNormal();
} else {
$this->runParallel();
}
}
public function runParallel()
{
$host = $this->host;
$site_host = $this->siteHost;
$output = $this->output;
$input = $this->input;
$worker = new Worker();
$worker->count = 1;
$worker->name = 'start_timer_parallel';
$worker->timerRequestList = $this->requestList;
$worker->onWorkerStart = function () use ($worker, $host, $site_host, $output, $input) {
Timer::add(1, function () use ($worker, $host, $site_host, $output, $input) {
$request_list = $worker->timerRequestList;
foreach ($request_list as $request_item) {
if (!isset($request_item['is_running'])) {
$request_item['is_running'] = false;
}
if (!isset($request_item['last_run_time'])) {
$request_item['last_run_time'] = 0;
}
if (time() - $request_item['last_run_time'] < $request_item['frequency']) {
continue;
}
$request_item['is_running'] = true;
$request_item['last_run_time'] = time();
$output->writeln(date('Y-m-d H:i:s') . ': build site request async:' . $request_item['target']);
//
$options = [
'max_conn_per_addr' => Config::get('timer.max_conn_per_addr', 1000),
'keepalive_timeout' => Config::get('timer.keepalive_timeout', 86400),
'connect_timeout' => Config::get('timer.connect_timeout', 86400),
'timeout' => Config::get('timer.timeout', 86400),
];
$http = new HttpClient($options);
$http->request($host . $request_item['target'], [
'headers' => [
'Host' => $site_host,
'Accept' => 'application/json,text/plain',
],
'success' => function ($response) use ($request_item) {
$request_item['is_running'] = false;
echo $response->getBody();
},
'error' => function ($exception) use ($request_item) {
$request_item['is_running'] = false;
echo $exception;
},
]);
}
});
};
Worker::runAll();
}
public function runNormal()
{
$host = $this->host;
$site_host = $this->siteHost;
$request_list = $this->requestList;
$output = $this->output;
$input = $this->input;
$client = new Client([
'base_uri' => $host,
'headers' => [
'Host' => $site_domain,
'Host' => $site_host,
'Accept' => 'application/json,text/plain',
],
'verify' => false,
]);
while (true) {
try {
$config_list = include app_file_path('common/command/timer/config.php');
$list_promises = [];
foreach ($config_list as $config_item) {
$config_item = static::initConfigItem($config_item);
foreach ($request_list as $request_item) {
$name = $request_item['name'];
$name = $config_item['name'];
if ($name == 'http_demo' && !env('adminsystem.is_demo', false)) {
continue;
}
$cache_key = 'timer_' . $name;
$cache_key = 'timer_' . $name . '_' . $request_item['concurrency_id'];
$cache_tag = 'system_timer';
$last_exec_time = Cache::get($cache_key, 0);
if ($last_exec_time >= time() - $config_item['frequency']) {
if ($last_exec_time >= time() - $request_item['frequency']) {
continue;
}
Cache::tag($cache_tag)->set($cache_key, time());
$type = $config_item['type'];
$type = $request_item['type'];
switch ($type) {
case 'site':
$output->writeln(date('Y-m-d H:i:s') . ': build site request async:' . $config_item['target']);
$list_promises[$config_item['name']] = $client->getAsync($config_item['target']);
$output->writeln(date('Y-m-d H:i:s') . ': build site request async:' . $request_item['target']);
$list_promises[$request_item['name']] = $client->getAsync($request_item['target']);
break;
@@ -118,6 +242,7 @@ class TimerBase extends Command
'type' => 'site',
'target' => '',
'frequency' => 600,
'concurrency' => 1,
];
$data = array_merge($default, $config);
@@ -125,7 +250,8 @@ class TimerBase extends Command
if ($data['frequency'] < 0) {
$data['frequency'] = 0;
}
$model_timer = new VirtualModel($data);
return $data;
return $model_timer;
}
}