问题描述
我有一个使用 PHP 和 Ratchet 库的非常简单的 websocket。
当用户打开特定页面时,它会将用户 ID 发送到我的套接字,并且它应该更新该用户的状态(目前我只是在控制台中记录它),如下所示:
<input type="hidden" value="'.$account_id.'" id="account_id">
<input type="hidden" value="trial" id="request_type">
<script>
$(document).ready(function(){
var conn = new WebSocket('ws://127.0.0.1:8080');
conn.onopen = function(e){
console.log("Connection Opened!");
var account_id = $("#account_id").val();
var request_type = $("#request_type").val();
var data = {account_id: account_id,request_type: request_type};
conn.send(JSON.stringify(data));
}
conn.onclose = function(e){
console.log("Connection Closed!");
}
conn.onmessage = function(e) {
var data = JSON.parse(e.data);
console.log(data);
};
conn.onerror = function(e){
var data = JSON.parse(e.data);
console.log(data);
}
})
</script>
那么我的socket脚本如下:
set_time_limit(0);
use Ratchet\MessageComponentInterface;
use Ratchet\ConnectionInterface;
use Ratchet\Server\IoServer;
use Ratchet\Http\HttpServer;
use Ratchet\WebSocket\WsServer;
require dirname(__DIR__) . '../vendor/autoload.PHP';
class socket implements MessageComponentInterface{
protected $clients;
public function __construct(){
$this->clients = new \SplObjectStorage;
echo 'Server Started.'.PHP_EOL;
}
public function onopen(ConnectionInterface $socket){
$this->clients->attach($socket);
echo 'New connection '.$socket->resourceId.'!'.PHP_EOL;
}
public function onClose(ConnectionInterface $socket) {
$this->clients->detach($socket);
echo 'Connection '.$socket->resourceId.' has disconnected'.PHP_EOL;
}
public function onError(ConnectionInterface $socket,\Exception $e) {
echo 'An error has occurred: '.$e->getMessage().'!'.PHP_EOL;
$socket->close();
}
public function onMessage(ConnectionInterface $from,$json){
echo 'Connection '.$from->resourceId.' sent '.$json.PHP_EOL;
$data = json_decode($json,true);
$account_id = $data['account_id'];
$request_type = $data['request_type'];
try {
$conn = new PDO("MysqL:host=".$db_host.";port:".$db_port.";dbname=".$db_name."",$db_user,$db_pass);
$conn->setAttribute(PDO::ATTR_ERRMODE,PDO::ERRMODE_EXCEPTION);
}catch(PDOException $e){
echo $e->getMessage();
}
foreach ($this->clients as $client) {
if ($from->resourceId == $client->resourceId) {
if($request_type == 'trial'){
// while(true){
$response_array= [];
$stmt = $conn->prepare("SELECT * FROM table WHERE account_id=:account_id AND last_status_change=Now()");
$stmt->bindParam(':account_id',$account_id);
$stmt->execute();
$result = $stmt->setFetchMode(PDO::FETCH_ASSOC);
foreach($stmt->fetchAll() as $key=>$value) {
$response_array[$key] = $value;
}
if(!empty($response_array)){
foreach($response_array as $item){
$status = $item['status'];
}
$response = array(
'account_id' => $account_id,'status' => $status
);
var_dump($response);
$client->send(json_encode($response));
}
// sleep(5);
// }
}
}
}
}
}
$server = IoServer::factory(
new HttpServer(
new WsServer(
new socket()
)
),8080
);
$server->run();
就目前而言,它按预期工作,但仅在页面加载时状态发生更改时才提供当前状态,并且我将在控制台中看到状态,只要我取消注释 {{1 }} 循环以实际继续检查更新状态,当状态发生变化但客户端没有记录任何内容时,我的套接字将在命令行中执行 while()
结果。
我是 websockets 的新手,我一直在通过在 JS 中进行长时间轮询,该间隔将 var_dump()
发送到获取最新数据库结果的 PHP 脚本,但效率不高,并且当大量客户端处于活动状态并不断向文件发出请求时会导致问题,这反过来又会减慢数据库的速度。所以我不确定为什么 fetch()
循环会像这样影响它,或者我是否以正确的方式处理这个问题。
解决方法
用 if ($from->resourceId == $client->resourceId) {
替换这行 if ($from == $client) {
这个更改可能看起来很简单,但在 php 棘轮提供的示例 Chat 类中,为了避免将消息发送给发件人,他们有条件向客户端发送消息除了发件人,他们这样比较if ($from == $client) {
不仅是整个对象本身的一个resourceId!
while 循环不是它的工作原理。它会阻塞东西并无限且不必要地消耗资源。
你想要的是addPeriodicTimer()
。
定期检查需要升级的客户。
将这样的内容添加到您的引导程序中:
$reactEventLoop->addPeriodicTimer(5,function() use $messageHandler,$server {
// Fetch all changed clients at once and update their status
$clientsToUpdate = getUpdatedClients($server->app->clients);
foreach ($clientsToUpdate as $client) {
$client->send(json_encode($response));
}
});
这比任何其他方法都轻得多,因为您可以
- 使用单个准备好的数据库查询获取 N 个客户端状态
- 仅定期更新更改的客户端
- 不要将您的应用置于阻塞状态
Stackoverflow 上的其他资源将帮助您找到正确的位置:
How do I access the ratchet php periodic loop and client sending inside app?
Periodically sending messages to clients in Ratchet
,您应该使用 Ratchet 的 addPeriodicTimer
,尽管您必须公开 $clients
才能放置计时器。
也许您可以将它放在类中并且仍然是私有的,但我不确定它是否可以为每个客户端启动一个计时器。
无论如何,如您所见,您可以创建另一个公共函数,它实际上将在周期性计时器中完成这项工作(就像 while 循环一样)
然后在客户端连接后在 timerloop 内多次调用它,
为此,我还创建了一个公共 account_ids
以保留帐户 ID 的卡车
试一试,告诉我
use Ratchet\MessageComponentInterface;
use Ratchet\ConnectionInterface;
use Ratchet\Server\IoServer;
use Ratchet\Http\HttpServer;
use Ratchet\WebSocket\WsServer;
require dirname(__DIR__) . '../vendor/autoload.php';
class socket implements MessageComponentInterface{
public $clients;
public $account_ids;
public function __construct(){
$this->clients = new \SplObjectStorage;
echo 'Server Started.'.PHP_EOL;
}
public function onOpen(ConnectionInterface $socket){
$this->clients->attach($socket);
echo 'New connection '.$socket->resourceId.'!'.PHP_EOL;
}
public function onClose(ConnectionInterface $socket) {
$this->clients->detach($socket);
echo 'Connection '.$socket->resourceId.' has disconnected'.PHP_EOL;
}
public function onError(ConnectionInterface $socket,\Exception $e) {
echo 'An error has occurred: '.$e->getMessage().'!'.PHP_EOL;
$socket->close();
}
public function onMessage(ConnectionInterface $from,$json){
echo 'Connection '.$from->resourceId.' sent '.$json.PHP_EOL;
$data = json_decode($json,true);
$account_id = $data['account_id'];
$request_type = $data['request_type'];
foreach ( $this->clients as $client ) {
if ( $from->resourceId == $client->resourceId ) {
if( $request_type == 'trial'){
$this->account_ids[$client->resourceId] = $account_id;
$this->checkStatus($client,$account_id);
}
}
}
}
public function checkStatus($client,$account_id){
try {
$conn = new PDO("mysql:host=".$db_host.";port:".$db_port.";dbname=".$db_name."",$db_user,$db_pass);
$conn->setAttribute(PDO::ATTR_ERRMODE,PDO::ERRMODE_EXCEPTION);
}catch(PDOException $e){
echo $e->getMessage();
}
$response_array= [];
$stmt = $conn->prepare("SELECT * FROM table WHERE account_id=:account_id AND last_status_change=now()");
$stmt->bindParam(':account_id',$account_id);
$stmt->execute();
$result = $stmt->setFetchMode(PDO::FETCH_ASSOC);
foreach($stmt->fetchAll() as $key=>$value) {
$response_array[$key] = $value;
}
if ( !empty($response_array) ) {
foreach($response_array as $item){
$status = $item['status'];
}
$response = array(
'account_id' => $account_id,'status' => $status
);
var_dump($response);
$client->send(json_encode($response));
}
}
}
$socket = new socket();
$server = IoServer::factory(
new HttpServer(
new WsServer(
$socket
)
),8080
);
$server->loop->addPeriodicTimer(5,function () use ($socket) {
foreach($socket->clients as $client) {
echo "Connection ".$client->resourceId." check\n";
$socket->checkStatus($client,$socket->account_ids[$client->resourceId]);
}
});
$server->run();