Use socket_select to improve socket reader efficiency

Fix:(When using ZoneTcpClient)
1. Many packets are stuck in receiving queue, cannot be processed in time;
   On Linux, use below command to check Recv-Q:
       ss -ntp dport = 2243
2. Above issue will cause websocket to be dropped from time to time.
3. Improve websocket message parsing, so we won't see unexpected messages;
This commit is contained in:
alfred 2020-03-20 22:22:35 +08:00
parent aef330a8f3
commit faf989bd65
3 changed files with 82 additions and 18 deletions

View File

@ -61,7 +61,7 @@ class App
Log::error($error_msg); Log::error($error_msg);
// Notice::push('error', $error_msg); // Notice::push('error', $error_msg);
} }
yield new Delayed(1000); yield call_user_func(array('BiliHelper\Plugin\\' . $taskName, 'Delayed'), []);
} }
}); });
} }

View File

@ -14,6 +14,7 @@ use BiliHelper\Core\Log;
use BiliHelper\Core\Curl; use BiliHelper\Core\Curl;
use BiliHelper\Util\TimeLock; use BiliHelper\Util\TimeLock;
use Amp\Delayed;
use Exception; use Exception;
use Socket\Raw\Factory; use Socket\Raw\Factory;
@ -160,12 +161,13 @@ class ZoneTcpClient
/** /**
* @use 响应数据 * @use 响应数据
* @param $msg * @param $msg
* @param $type
* @return bool * @return bool
*/ */
private static function onMessage($msg) private static function onMessage($msg, $type)
{ {
// 心跳后回复人气 // 心跳后回复人气
if (strlen($msg) == 4) { if ($type == 3) {
// $num = unpack('N', $msg)[1]; // $num = unpack('N', $msg)[1];
// Log::info("当前直播间现有 {$num} 人聚众搞基!"); // Log::info("当前直播间现有 {$num} 人聚众搞基!");
return false; return false;
@ -335,8 +337,8 @@ class ZoneTcpClient
*/ */
private static function unPackMsg($value) private static function unPackMsg($value)
{ {
$res = unpack('N', $value); $res = unpack('Npacklen/nheadlen/nver/Nop/Nseq', $value);
return $res[1] - 16; return $res;
} }
@ -360,25 +362,45 @@ class ZoneTcpClient
/** /**
* @use 读数据 * @use 读数据
* @param $length * @param $length
* @param $is_header
* @return array|bool|false * @return array|bool|false
*/ */
private static function reader($length) private static function reader($length, $is_header = false)
{ {
$data = false; $data = false;
try { try {
while (self::$client->selectRead(self::$socket_timeout)) { if (self::$client->selectRead(self::$socket_timeout)) {
$data = self::$client->read($length); $ret = 0;
if (!$data) { $socket = self::$client->getResource();
while ($length) {
$cnt = 0;
$r = array($socket);
$w = NULL;
$e = NULL;
while ($cnt++ < 60) {
$ret = socket_select($r, $w, $e, 1);
if ($ret === false)
throw new Exception("Connection failure");
if ($ret)
break;
}
$ret = socket_recv($socket, $buffer, $length, 0);
if ($ret < 1) {
print_r("Socket error: [{$ret}] [{$length}]" . PHP_EOL);
throw new Exception("Connection failure"); throw new Exception("Connection failure");
} }
if ($length == 16) $data = self::unPackMsg($data); $data .= $buffer;
break; unset($buffer);
$length -= $ret;
}
if ($is_header) $data = self::unPackMsg($data);
} }
} catch (Exception $exception) { } catch (Exception $exception) {
self::triggerReConnect([ self::triggerReConnect([
'area_id' => self::$area_id, 'area_id' => self::$area_id,
'wait_time' => time() + 60 'wait_time' => time() + 60
]); ]);
$data = false;
} }
return $data; return $data;
} }
@ -415,14 +437,46 @@ class ZoneTcpClient
self::$client = $client_info['client']; self::$client = $client_info['client'];
self::$area_id = $client_info['area_id']; self::$area_id = $client_info['area_id'];
self::$room_id = $client_info['room_id']; self::$room_id = $client_info['room_id'];
$len_body = self::reader(16); $head = self::reader(16, true);
if (!$len_body) { if (!$head) {
// 长度为0 ,空信息 // 长度为0 ,空信息
continue; continue;
} }
$length = isset($head['packlen']) ? $head['packlen'] : 16;
$type = isset($head['op']) ? $head['op'] : 0x0000;
$len_body = $length - 16;
Log::debug("(len=$len_body)"); Log::debug("(len=$len_body)");
if (!$len_body)
continue;
$body = self::reader($len_body); $body = self::reader($len_body);
self::onMessage($body); if ($body)
self::onMessage($body, $type);
} }
} }
/*
* @use replace delay by select
*/
public static function Delayed()
{
$r = [];
$w = NULL;
$e = NULL;
$delay = 0;
if (self::getLock() > time())
return new Delayed(1000);
try {
foreach (self::$client_maps as $client_info) {
if ($client_info['client'])
$r[] = $client_info['client']->getResource();
}
if (count($r) !== 0)
socket_select($r, $w, $e, 1);
else
$delay = 50;
} catch (Exception $exception) {
$delay = 1000;
}
return new Delayed($delay);
}
} }

View File

@ -10,6 +10,7 @@
namespace BiliHelper\Util; namespace BiliHelper\Util;
use Amp\Delayed;
trait TimeLock trait TimeLock
{ {
@ -33,6 +34,15 @@ trait TimeLock
return static::$lock; return static::$lock;
} }
/**
* @use used in Amp loop Delayed
* @return delayed
*/
public static function Delayed()
{
return new Delayed(1000);
}
/** /**
* @use 定时 * @use 定时
* @param int $hour * @param int $hour