swoole创立websocket服务器

前文连接,阅读的时候最好参照EasySwoole2.1.2的源码

[toc]
swoole算是nodejs在php中的一种达成,异步响应请求,质量超强

前文提到的在系统安装Cache组件
Cache::getInstance()的时候,会去调用processManager去创造Cache的长河,然后以管道通讯的方式进行安装缓存和获得缓存。

1.是怎么着范围Laravel框架的速度?

$inst->run();//启动服务

1 安装准备

Cache是以单例方式完毕的。构造器会实行如下操作

Laravel框架运行的时候须求加载很多文本,再添加其出了名的生态环境好,所以在支付进度中我们就会发现有万分多的已经造好的车轱辘,那也就使得Laravel的1遍运营的磁盘IO尤其高(正是要加载很多文书嘛),博主在此之前的贰次博客Laravel5框架质量优化技术中也波及过部分优化,不过并没有实质的消除磁盘IO高的标题。

 那里其实调用的是Core的start方法ServerManager::getInstance()->start();

1.1 安装swoole前必须保障系统已经设置了下列软件

php-5.3.10 或更高版本
gcc-4.4 或更高版本
make
autoconf
pcre (centos系统能够执行命令:yum install pcre-devel)

//根据配置创建指定数目的Cache服务进程,然后启动。
$num = intval(Config::getInstance()->getConf("EASY_CACHE.PROCESS_NUM"));//默认配置数目是1,在Config.php里'EASY_CACHE.PROCESS_NUM'=>1
if($num <= 0){
   return;
}
$this->cliTemp = new SplArray();//这个数组以后会给单元测试时候单独使用,正常模式这个数组是不使用的
//若是在主服务创建,而非单元测试调用
if(ServerManager::getInstance()->getServer()){
    //创建了一个swoole_table ,表名为__Cache,里面存储data(后面就讲到其实这里存储的是操作Cache的指令)作用是用来做GC(防止Cache被撑爆)
    TableManager::getInstance()->add(self::EXCHANGE_TABLE_NAME,[
        'data'=>[
            'type'=>Table::TYPE_STRING,
            'size'=>10*1024
        ],
        'microTime'=>[
            'type'=>Table::TYPE_STRING,
            'size'=>15
        ]
    ],2048);
    $this->processNum = $num;
    for ($i=0;$i < $num;$i++){
        ProcessManager::getInstance()->addProcess($this->generateProcessName($i),CacheProcess::class);
    }
}

既是已经知道了难点所在,那就很不难化解了,只要每一回运转Laravel的时候不都再次加载那么些文件就好了,那时候远近闻明的swoole就上台了。

其一措施重倘诺开发银行swoole服务的

1.2 下载并解压

下载地址
进去页面后选取download链接下的tar.gz的压缩包
下载源代码包后,解压
tar xzvf xxx.tar.gz,
在巅峰进入源码目录,执行上边包车型地铁一声令下进行编写翻译和装置
cd swoole
phpize
./configure –enable-swoole-debug
make
sudo make install

编写翻译参数依照自身的须要选拔,详情参见官方文书档案。

ProcessManager::getInstance()->addProcess($this->generateProcessName($i),CacheProcess::class)那句话才是Cache的主导逻辑。

 

//创建主服务
$this->createMainServer();

1.3 编写翻译安装成功后,修改php.ini

在php.ini中加入 extension=swoole.so
通过在命令行使用 php-m查看,是不是安装了swoole

小心:如通重新编写翻译的话必要 make clean

ProcessManager::getInstance()那句话重要做了下边包车型客车操作
ProcessManager
的__construct构造函数成立了二个swoole_table,表名是process_hash_map

2.Swoole

 在那块代码里根本是基本,是在swoole执行start服务前安设相关安顿以及配备相关回调函数。具体代码如下

2 营造Swoole基本实例

TableManager::getInstance()->add(
    'process_hash_map',[
        'pid'=>[
            'type'=>Table::TYPE_INT,
            'size'=>10
        ]
    ],256
);

Swoole号称重新定义了PHP,它是二个PHP扩张,使得PHP能够使用异步的法子实行,就像是node一样,而且仍是能够动用socket,为PHP提供了一三种异步IO、事件驱动、并行数据结构作用。Swoole
能够广泛应用于网络、移动通讯、集团软件、云计算、互连网游戏、物联网(IOT)、车联网、智能家居等世界。它能够大大升级项目标天性。

 先给服务器配置相关运转参数

2.1 tcp服务器实例

(来自w3cschool教程

服务端代码:Server.php

<?php
// Server
class Server
{
    private $serv;

    public function __construct() {
        $this->serv = new swoole_server("0.0.0.0", 9501);
        $this->serv->set(array(
            'worker_num' => 8,
            'daemonize' => false,
            'max_request' => 10000,
            'dispatch_mode' => 2,
            'debug_mode'=> 1
        ));

        $this->serv->on('Start', array($this, 'onStart'));
        $this->serv->on('Connect', array($this, 'onConnect'));
        $this->serv->on('Receive', array($this, 'onReceive'));
        $this->serv->on('Close', array($this, 'onClose'));

        $this->serv->start();
    }

    public function onStart( $serv ) {
        echo "Start\n";
    }

    public function onConnect( $serv, $fd, $from_id ) {
        $serv->send( $fd, "Hello {$fd}!" );
    }

    public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {
        echo "Get Message From Client {$fd}:{$data}\n";
    }

    public function onClose( $serv, $fd, $from_id ) {
        echo "Client {$fd} close connection\n";
    }
}
// 启动服务器
$server = new Server();

从代码中能够见见,创设3个swoole_server基本分三步:

  1. 通过构造函数创建swoole_server对象
  2. 调用set函数设置swoole_server的相关配置选项
  3. 调用on函数设置相关回调函数
    关于set配置选项以及on回调函数的有血有肉表达,请参考小编收拾的swoole文档(
    配置选项)
    这边只交付简单介绍。onStart回调在server运转前被调用,onConnect在有新客户端连接过来时被调用,onReceive函数在有数量发送到server时被调用,onClose在有客户端断开连接时被调用。
    那里就能够大体看看哪些运用swoole:在onConnect处监听新的连天;在onReceive处接收数据并拍卖,然后可以调用send函数将处理结果发送出去;在onClose随地理客户端下线的事件。

客户端的代码:Client.php

<?php
class Client
{
    private $client;

    public function __construct() {
        $this->client = new swoole_client(SWOOLE_SOCK_TCP);
    }

    public function connect() {
        if( !$this->client->connect("127.0.0.1", 9501 , 1) ) {
            echo "Error: {$fp->errMsg}[{$fp->errCode}]\n";
        }
        $message = $this->client->recv();
        echo "Get Message From Server:{$message}\n";

        fwrite(STDOUT, "请输入消息:");  
        $msg = trim(fgets(STDIN));
        $this->client->send( $msg );
    }
}

$client = new Client();
$client->connect();

这里,通过swoole_client创制一个基于TCP的客户端实例,并调用connect函数向钦命的IP及端口发起连接请求。随后即可通过recv()和send()七个函数来接收和出殡和埋葬请求。须要留意的是,那里小编利用了默许的共同阻塞客户端,因而recv和send操作都会发生网络不通。

动用办法

进入到文件目录,在窗口1先运维php
Serve.php,然后再开多个窗口(窗口2)运维php Client.php
窗口1内容:

# root @ WENGINE in /data/learnSwoole [9:24:57] C:130
$ php Server.php
Start
Get Message From Client 1:ceshi1
Client 1 close connection

窗口2内容:

# root @ WENGINE in /data/learnSwoole [9:23:07] 
$ php Client.php
Get Message From Server:Hello 1!
请输入消息:ceshi1

addProcess($this->generateProcessName($i),CacheProcess::class);
$this->generateProcessName($i)那个代码很粗大略就是依照$i来设置进度名称
addProcess 是在processList存款和储蓄CacheProcess::class的实例,具体代码如下

 

$conf = Config::getInstance()->getConf("MAIN_SERVER");//获取Config.php 中配置的MAIN_SERVER数组
$runModel = $conf['RUN_MODEL'];//获取运行模式 默认是SWOOLE_PROCESS模式,使用进程模式,业务代码在Worker进程中执行
$host = $conf['HOST'];//获取运行的host 'HOST'=>'0.0.0.0',
$port = $conf['PORT'];//获取配置的运行端口为9501 'PORT'=>9501,
$setting = $conf['SETTING'];//这里设置的相关的配置项,这些配置做一些解释,参照https://wiki.swoole.com/wiki/page/274.html
'SETTING'=>[
         'task_worker_num' => 8, //配置Task进程的数量,配置此参数后将会启用task功能。所以Server务必要注册onTask、onFinish2个事件回调函数。如果没有注册,服务器程序将无法启动。
         'task_max_request'=>10,//设置task进程的最大任务数。一个task进程在处理完超过此数值的任务后将自动退出。这个参数是为了防止PHP进程内存溢出。如果不希望进程自动退出可以设置为0,炒鸡重要,进程中的数据,如果有一个global数组,或者全局变量,不设置这个就会不回收最终导致内存溢出
         'max_request'=>5000,//设置worker进程的最大任务数,这个和task_max_request功能一样,为了解决PHP进程内存溢出问题
         'worker_num'=>8//设置启动的Worker进程数
       ],
//其实这里还有2个配置,是前文中提到的,在前文19行$this->sysDirectoryInit();会初始化配置pid_file和log_file
pid_file //在Server启动时自动将master进程的PID写入到文件,在Server关闭时自动删除PID文件。
log_file //指定swoole错误日志文件。在swoole运行期发生的异常信息会记录到这个文件中。默认会打印到屏幕。
$sockType = $conf['SOCK_TYPE'];//指定当前运行的服务是什么服务器。有tcp服务,http服务,websocket服务。默认是tcp服务
switch ($conf['SERVER_TYPE']){
  case self::TYPE_SERVER:{
    $this->mainServer = new \swoole_server($host,$port,$runModel,$sockType);//创建mainServer,这里创建了一个tcp服务器
       break;
   }
  case self::TYPE_WEB_SERVER:{
    $this->mainServer = new \swoole_http_server($host,$port,$runModel,$sockType);//web方式是默认
       break;
   }
  case self::TYPE_WEB_SOCKET_SERVER:{
       $this->mainServer = new \swoole_websocket_server($host,$port,$runModel,$sockType);
       break;
   }
  default:{
    Trigger::throwable(new \Exception("unknown server type :{$conf['SERVER_TYPE']}"));
  }
}
$this->mainServer->set($setting);//将上面的相关服务启动配置到mainServer

2.2 web服务器

服务端代码 http_server.php

$http = new swoole_http_server("0.0.0.0", 9501);

$http->on('request', function ($request, $response) {
    var_dump($request->get, $request->post);
    $response->header("Content-Type", "text/html; charset=utf-8");
    $response->end("<h1>Hello Swoole. #".rand(1000, 9999)."</h1>");
});

$http->start();

Http服务器只供给关怀请求响应即可,所以只供给监听一个onRequest事件。当有新的Http请求进入就会触发此事件。事件回调函数有贰个参数,2个是$request对象,包含了请求的相干音信,如GET/POST请求的多寡。
别的二个是response对象,对request的响应得以由此操作response对象来形成。$response->end()方法表示输出一段HTML内容,并终止此呼吁。
● 0.0.0.0
表示监听所有IP地址,一台服务器可能还要有多少个IP,如127.0.0.1本地回环IP、192.168.1.100局域网IP、210.127.20.2
外网IP,那里也能够独自钦命监听贰个IP
● 9501 监听的端口,若是被挤占程序会抛出致命错误,中断执行。

$key = md5($processName);
if(!isset($this->processList[$key])){
    try{

        $process = new $processClass($processName,$args,$async);
        $this->processList[$key] = $process;
        return true;
    }catch (\Throwable $throwable){
        Trigger::throwable($throwable);
        return false;
    }
}else{
    trigger_error("you can not add the same name process : {$processName}.{$processClass}");
    return false;
}

3.使用Swoole提升Laravel的性能

创造默许的事件注册器,给服务注册私下认可的是事件处理函数

2.3 WebSocket服务器

服务端程序代码 ws_server.php

//创建websocket服务器对象,监听0.0.0.0:9502端口
$ws = new swoole_websocket_server("0.0.0.0", 9502);

//监听WebSocket连接打开事件
$ws->on('open', function ($ws, $request) {
    var_dump($request->fd, $request->get, $request->server);
    $ws->push($request->fd, "hello, welcome\n");
});

//监听WebSocket消息事件
$ws->on('message', function ($ws, $frame) {
    echo "Message: {$frame->data}\n";
    $ws->push($frame->fd, "server: {$frame->data}");
});

//监听WebSocket连接关闭事件
$ws->on('close', function ($ws, $fd) {
    echo "client-{$fd} is closed\n";
});

$ws->start();

WebSocket服务器是创立在Http服务器之上的长连接服务器,客户端首先会发送二个Http的请求与服务器举办握手。握手成功后会触发onOpen事件,表示连接已就绪,onOpen函数中得以获得$request对象,包罗了Http握手的连带音信,如GET参数、Cookie、Http头音信等。
确立连接后客户端与劳务器端就能够双向通讯了。
● 客户端向劳动器端发送音信时,服务器端触发onMessage事件回调

服务器端能够调用$server->push()向有些客户端(使用$fd标识符)发送音讯
● 服务器端能够安装onHandShake事件回调来手工业处理WebSocket握手
运营程序

客户端的代码

能够使用Chrome浏览器实行测试,JS代码为:

var wsServer = 'ws://127.0.0.1:9502';
var websocket = new WebSocket(wsServer);
websocket.onopen = function (evt) {
    console.log("Connected to WebSocket server.");
};

websocket.onclose = function (evt) {
    console.log("Disconnected");
};

websocket.onmessage = function (evt) {
    console.log('Retrieved data from server: ' + evt.data);
};

websocket.onerror = function (evt, e) {
    console.log('Error occured: ' + evt.data);
};
  • 不能够一直使用swoole_client与websocket服务器通信,swoole_client是TCP客户端
  • 必须兑现WebSocket协议才能和WebSocket服务器通讯,能够动用swoole/framework提供的PHP
    WebSocket客户端
  • WebSocket服务器除了提供WebSocket效率之外,实际上也得以拍卖Http长连接。只需求追加onRequest事件监听即可兑现Comet方案Http长轮询。

关于onRequest回调
swoole_websocket_server 继承自 swoole_http_server

  • 安装了onRequest回调,websocket服务器也能够同时作为http服务器
  • 未设置onRequest回调,websocket服务器收到http请求后会重返http
    400指鹿为马页面
  • 如若想透过收到http触发全数websocket的推送,需求留意功能域的题材,面向进程请使用“global”对swoole_websocket_server实行引用,面向对象能够把swoole_websocket_server设置成二个分子属性

能够创设更多的劳务器 参照官方文书档案尝试

那么CacheProcess::class的实例话做了怎么样操作呢
$this->cacheData = new
SplArray();//那里很要紧,为何这么说每一种Cache进度实际保存的缓存值都是在此间的,每种Cache进度都有谈得来的二个cacheData数组
$this->persistentTime =
Config::getInstance()->getConf(‘EASY_CACHE.PERSISTENT_TIME’);
parent::__construct($processName, $args);
CacheProcess::class继承于AbstractProcess
AbstractProcess的构造方法

在存活的车轱辘中,感觉上面那八个依旧那一个不易的,能够自动选择

$register = new EventRegister();
$this->finalHook($register);
EasySwooleEvent::mainServerCreate($this,$register);//这里是框架全局事件mainServerCreate主服务创建事件
 $events = $register->all();
//然后循环给swoole服务器绑定回调函数。这里同一个回调方法设置多个回调函数
foreach ($events as $event => $callback){
     $this->mainServer->on($event, function () use ($callback) {
       $ret = [];
       $args = func_get_args();
       foreach ($callback as $item) {
          array_push($ret,Invoker::callUserFuncArray($item, $args));
       }
       if(count($ret) > 1){
          return $ret;
       }
       return array_shift($ret);
     });
}

3 使用laravel5.5实现的前后台通讯实例

器重思路是运用php artisan
自行建造命令控克制务端,使用HTML5的websocket达成客户端效用

服务端:app/Console/Commands/Websocket.php内容

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use swoole_http_request;
use swoole_http_response;
use swoole_websocket_server;

class WebSocket extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'websocket 
                            {cmd=start : can use start|stop|status|restart}
                            {--daemon : set to run in daemonize mode}
                            ';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'swoole server control';


    /**
     * server
     *
     * @var swoole_websocket_server
     */
    private $server;

    /**
     *      * TYPE_ADMIN
     *           */
    const TYPE_ADMIN = 0X00;

    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
    }

    /**
     * 处理不同command信息
     *
     * @return mixed
     */
    public function handle()
    {
        $command = $this->argument('cmd');
        $option = $this->option('daemon');

        switch ($command) {
            case 'start':
                $this->initWs($option);
                break;
            case 'stop':
                $res = $this->sendAdminRequest($command);
                if ($res){
                    $this->info('stop the server successfully');
                } else {
                    $this->info('the server is not running');
                }
                break;
            case 'status':
                $res = $this->sendAdminRequest($command);
                if ($res){
                    $this->info($res);
                } else {
                    $this->info('the server is not running');
                }
                break;
            case 'restart':
                $res = $this->sendAdminRequest($command);
                if ($res){
                    $this->info('restart the server successfully');
                } else {
                    $this->info('the server is not running');
                }
                break;
            default:
                $this->info('请按照下面格式输入命令:php artisan websocket {start|stop|status|restart}');
                break;
        }

    }

    //初始化服务端
    public function initWs($daemonize = false) 
    {
        if ($daemonize) {
            $this->info('Starting Websocke server in daemon mode...');
        } else {
            $this->info('Starting Websocke server in interactive mode...');
        }

        $server = new swoole_websocket_server('0.0.0.0', 9501);
        $server->set([
            'daemonize' => $daemonize,
            'log_file' => '/var/log/websocket.log'
        ]);

        $server->on('close', function ($server, $fd) {
            $this->info('close websocket server');
        });

        $server->on('open', function (swoole_websocket_server $server, $request) {
            $this->info('websocket open');
        });

        $server->on('open', [$this, 'onOpen']);
        $server->on('close', [$this, 'onClose']);
        $server->on('message', [$this, 'onMessage']);
        $server->on('request', [$this, 'onRequest']);

        $this->server = $server;
        $this->server->start();
    }

    public function onOpen(swoole_websocket_server $server, $request) 
    {
            $this->info('websocket open');
    }

    public function onClose($server, $fd) 
    {
            $this->info('close websocket server');
    }

    public function onMessage(swoole_websocket_server $server, $frame) 
    {
        $this->info($frame->data);
        $data = json_decode($frame->data, true);

        //对data进行逻辑处理
        $reply = '发送的信息是:' . $data['message'];
        $response = [
            'status' => true,
            'data' => $reply
        ];
        $server->push($frame->fd, json_encode($response));
    }

    //websocket客户端同样支持http协议
    public function onRequest(swoole_http_request $request, swoole_http_response $response) 
    {
        if ($request->post['type'] == self::TYPE_ADMIN) {
            $ret = json_encode($this->commandHandle($request->post['content']));
            return $response->end($ret);
        }
    }

    //操作命名
    public function commandHandle($command) {
        if ($command == 'status') {
            $this->info('handle status');
            return $this->server->stats();
        }
        if ($command == 'restart') {
            $this->info('handle restart');
            return $this->server->reload();
        }
        if ($command == 'stop') {
            $this->info('handle stop');
            return $this->server->stop() && $this->server->shutdown();
        }
        return 'Unknown Command';
    }

    //发送http请求
    public function sendAdminRequest($content) {
        $ch = curl_init();
        curl_setopt($ch, CURLOPT_URL, "http://127.0.0.1:9501");
        curl_setopt($ch, CURLOPT_HEADER, 0);
        curl_setopt($ch, CURLOPT_POST, 1);
        curl_setopt($ch, CURLOPT_HTTPHEADER, ['Expect:']);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
        curl_setopt($ch, CURLOPT_POSTFIELDS, [
            'type' => self::TYPE_ADMIN,
            'content' => $content
        ]);

        $response = curl_exec($ch);
        curl_close($ch);
        return $response;
    }

}

客户端内容

<!doctype html>
<html lang="{{ app()->getLocale() }}">
    <head>
        <meta charset="utf-8">
        <meta http-equiv="X-UA-Compatible" content="IE=edge">
        <meta name="viewport" content="width=device-width, initial-scale=1">
        <title>websocket client</title>
    </head>
    <body>
        <div>
            <input id="message-content" type="text" name="message" />
            <button onclick="sendMessage()">发送消息</button>
        </div>
    </body>
    <script>
        var wsServer = 'ws://115.159.81.46:9501';
        var websocket = new WebSocket(wsServer);
        websocket.onopen = function (evt) {
            console.log("Connected to WebSocket server.");
        };

        websocket.onclose = function (evt) {
            console.log("Disconnected");
        };

        websocket.onmessage = function (evt) {
            console.log('从服务器接收到json信息: ' + evt.data);
            alert('服务器返回信息:' + JSON.parse(evt.data).data);
        };

        websocket.onerror = function (evt, e) {
            console.log('Error occured: ' + evt.data);
        };

        function sendMessage(){
            var content = document.getElementById('message-content').value;
            var data = {
                message : content,
            }
            websocket.send(JSON.stringify(data));
        };
    </script>
</html>

启动websocket服务器

跻身系统根目录,
php artisan websocket [–daemon] //是还是不是利用daemon情势
php artisan websocket start|stop|status|restart //默认是start

$this->async = $async;
$this->args = $args;
$this->processName = $processName;
$this->swooleProcess = new \swoole_process([$this,'__start'],false,2);
ServerManager::getInstance()->getServer()->addProcess($this->swooleProcess);//然后swoole服务会addProcess一个Cache的任务进程。
  • swooletw/laravel-swoole
  • garveen/laravoole

此间提一下finalHook具体做了什么样操作

__start方法重假设给swoole_table,表名为process_hash_map插入当前CacheProcess的长河名为key,进程IDpid为value。并且注册进度退出的事件。

博主选用了第②个,不要问小编怎么,任性!然后大家就起来吧!

$this->finalHook($register);

//实例化对象池管理
PoolManager::getInstance();
//register,先绑定一个workerStart回调函数。此事件在Worker进程/Task进程启动时发生。这里创建的对象可以在进程全局周期内使用。
$register->add($register::onWorkerStart,function (\swoole_server $server,int $workerId){
 PoolManager::getInstance()->__workerStartHook($workerId);
 $workerNum = Config::getInstance()->getConf('MAIN_SERVER.SETTING.worker_num');
 $name = Config::getInstance()->getConf('SERVER_NAME');
 if(PHP_OS != 'Darwin'){
  if($workerId <= ($workerNum -1)){//判断当前是否是worker进程
   $name = "{$name}_Worker_".$workerId;
  }else{//这个是task进程
   $name = "{$name}_Task_Worker_".$workerId;
  }
  cli_set_process_title($name);//设置当前的进程名称
  //图片
 }
});
//EventHelper 是一个框架提供的默认的事件处理函数(这些放到后面具体讲述)
EventHelper::registerDefaultOnTask($register);//注册默认的task回调,处理task任务的具体函数
EventHelper::registerDefaultOnFinish($register);//注册默认的task任务完成后的回调
EventHelper::registerDefaultOnPipeMessage($register);//注册pipeMessage回调函数
$conf = Config::getInstance()->getConf("MAIN_SERVER");
//如果是http服务器或者websocket,需要注册request回调函数
if($conf['SERVER_TYPE'] == self::TYPE_WEB_SERVER || $conf['SERVER_TYPE'] == self::TYPE_WEB_SOCKET_SERVER){
 if(!$register->get($register::onRequest)){
  EventHelper::registerDefaultOnRequest($register);//这里包含了请求,然后路由解析,处理返回的功能。就跟一般web框架类似
 }
}  
if(PHP_OS != 'Darwin'){
    $process->name($this->getProcessName());
}
TableManager::getInstance()->get('process_hash_map')->set(
    md5($this->processName),['pid'=>$this->swooleProcess->pid]
);
ProcessManager::getInstance()->setProcess($this->getProcessName(),$this);
if (extension_loaded('pcntl')) {
    pcntl_async_signals(true);
}
Process::signal(SIGTERM,function ()use($process){
    $this->onShutDown();
    TableManager::getInstance()->get('process_hash_map')->del(md5($this->processName));
    swoole_event_del($process->pipe);
    $this->swooleProcess->exit(0);
});
if($this->async){
    swoole_event_add($this->swooleProcess->pipe, function(){
        $msg = $this->swooleProcess->read(64 * 1024);
        $this->onReceive($msg);
    });
}
$this->run($this->swooleProcess);

使用composer安装:

 系统运转后,在worker运行的时候,会开展更名的操作,如下图

$this->run($this->swooleProcess)这些函数是CacheProcess如若配置了persistent提姆e,就会敞开1个定时器定时去取$file

Config::getInstance()->getConf(‘TEMP_DI奥迪Q5’).”/{$processName}.data”;的数据备份,暗许是0也正是不会去做定时数据落地的操作

总的来看此间才是Cache组件在首先次实例化的时候做的有关业务,总括便是创办了钦定数量的Cache进度绑定到swoole服务器上。在全局的process_hash_map表中能找到相应的Cache进度ID。然后Cache进度是可以以管道情势来拓展通讯。

 

set缓存方法

public function set($key,$data)
{
    if(!ServerManager::getInstance()->isStart()){
        $this->cliTemp->set($key,$data);
    }
    if(ServerManager::getInstance()->getServer()){
        $num = $this->keyToProcessNum($key);
        $msg = new Msg();
        $msg->setCommand('set');
        $msg->setArg('key',$key);
        $msg->setData($data);
        ProcessManager::getInstance()->getProcessByName($this->generateProcessName($num))->getProcess()->write(\swoole_serialize::pack($msg));//直接把需要缓存的数据,封装成msg然后write给hash映射到的Cache进程
    }
}

当进程取获得的时候会回调onReceive方法

public function onReceive(string $str,...$agrs)
{
    // TODO: Implement onReceive() method.

    $msg = \swoole_serialize::unpack($str);
    $table = TableManager::getInstance()->get(Cache::EXCHANGE_TABLE_NAME);
    if(count($table) > 1900){
        //接近阈值的时候进行gc检测
        //遍历Table 依赖pcre 如果发现无法遍历table,检查机器是否安装pcre-devel
        //超过0.1s 基本上99.99%为无用数据。
        $time = microtime(true);
        foreach ($table as $key => $item){
            if(round($time - $item['microTime']) > 0.1){
                $table->del($key);
            }
        }
    }
    if($msg instanceof Msg){
        switch ($msg->getCommand()){
            case 'set':{
                $this->cacheData->set($msg->getArg('key'),$msg->getData());
                break;
            }
            case 'get':{
                $ret = $this->cacheData->get($msg->getArg('key'));
                $msg->setData($ret);
                $table->set($msg->getToken(),[
                    'data'=>\swoole_serialize::pack($msg),
                    'microTime'=>microtime(true)
                ]);
                break;
            }
            case 'del':{
                $this->cacheData->delete($msg->getArg('key'));
                break;
            }
            case 'flush':{
                $this->cacheData->flush();
                break;
            }
            case 'enQueue':{
                $que = $this->cacheData->get($msg->getArg('key'));
                if(!$que instanceof \SplQueue){
                    $que = new \SplQueue();
                    $this->cacheData->set($msg->getArg('key'),$que);
                }
                $que->enqueue($msg->getData());
                break;
            }
            case 'deQueue':{

                $que = $this->cacheData->get($msg->getArg('key'));
                if(!$que instanceof \SplQueue){
                    $que = new \SplQueue();
                    $this->cacheData->set($msg->getArg('key'),$que);
                }
                $ret = null;
                if(!$que->isEmpty()){
                    $ret = $que->dequeue();
                }
                $msg->setData($ret);
                //deQueue 有cli 服务未启动的请求,但无token
                if(!empty($msg->getToken())){
                    $table->set($msg->getToken(),[
                        'data'=>\swoole_serialize::pack($msg),
                        'microTime'=>microtime(true)
                    ]);
                }
                break;
            }
            case 'queueSize':{
                $que = $this->cacheData->get($msg->getArg('key'));
                if(!$que instanceof \SplQueue){
                    $que = new \SplQueue();
                }
                $msg->setData($que->count());
                $table->set($msg->getToken(),[
                    'data'=>\swoole_serialize::pack($msg),
                    'microTime'=>microtime(true)
                ]);
                break;
            }
        }
    }
}

那里一开始会开始展览缓存GC确定保障内部存款和储蓄器不会撑爆

set方法会直接给$this->cacheData,设置缓存值。

 

get方法相比奇特,它会去给Cache进程发送get的一声令下,然后Cache读取到命令会将值写到_Cache,Swoole_table表中。然后再去读取(那些会有2个while循环,类似自旋)出缓存内容。那样的便宜,能够有限支撑能够读取到登时的数据缓存,不会因为高并发读取到最新的缓存值内容。而且仍是可以更实用的做gc,幸免Cache内部存款和储蓄器撑爆。

public function get($key,$timeOut = 0.01)
{
    if(!ServerManager::getInstance()->isStart()){
        return $this->cliTemp->get($key);
    }
    $num = $this->keyToProcessNum($key);
    $token = Random::randStr(9);//这个是一个凭证,是确保获取到自己此刻想获取的cache数据,和事务类似为了保证可重复读
    $process = ProcessManager::getInstance()->getProcessByName($this->generateProcessName($num));
    $msg = new  Msg();
    $msg->setArg('timeOut',$timeOut);
    $msg->setArg('key',$key);
    $msg->setCommand('get');
    $msg->setToken($token);
    $process->getProcess()->write(\swoole_serialize::pack($msg));
    return $this->read($token,$timeOut);
}

$process->getProcess()->write(\swoole_serialize::pack($msg))发那些包给Cache进度,Cache进度会举办上面那几个操作

$ret = $this->cacheData->get($msg->getArg('key'));//获取到当前的缓存值
$msg->setData($ret);
//将当前的内容设置到_Cache表中,token是请求的时候发过来的凭证原样拼装。这有什么好处呢,就是确保在高并发下,在A时刻获取的缓存,不会拿到后面B时刻更新的值。
$table->set($msg->getToken(),[
    'data'=>\swoole_serialize::pack($msg),
    'microTime'=>microtime(true)
]);

$this->read($token,$timeOut);

//这里的操作是直接从_Cache表中获取缓存数据,如果缓存存在并且进程调度没有超时,然后在表中将取过数据的内容删除掉返回
private function read($token,$timeOut)
{
    $table = TableManager::getInstance()->get(self::EXCHANGE_TABLE_NAME);
    $start = microtime(true);
    $data = null;
    while(true){
        usleep(1);
        if($table->exist($token)){
            $data = $table->get($token)['data'];
            $data = \swoole_serialize::unpack($data);
            if(!$data instanceof Msg){
                $data = null;
            }
            break;
        }
        if(round($start - microtime(true),3) > $timeOut){
            break;
        }
    }
    $table->del($token);
    if($data){
        return $data->getData();
    }else{
        return null;
    }
}

 

composer require swooletw/laravel-swoole

发表评论

电子邮件地址不会被公开。 必填项已用*标注