Merge pull request #23 from fgggid/zone

Use socket_select to improve socket reader efficiency
This commit is contained in:
Lkeme 2020-03-20 23:17:31 +08:00 committed by GitHub
commit d23ce53e16
3 changed files with 82 additions and 18 deletions

View File

@ -61,7 +61,7 @@ class App
Log::error($error_msg);
// Notice::push('error', $error_msg);
}
yield new Delayed(1000);
yield call_user_func(array('BiliHelper\Plugin\\' . $taskName, 'Delayed'), []);
}
});
}
@ -102,4 +102,4 @@ class App
}
Loop::run();
}
}
}

View File

@ -14,6 +14,7 @@ use BiliHelper\Core\Log;
use BiliHelper\Core\Curl;
use BiliHelper\Util\TimeLock;
use Amp\Delayed;
use Exception;
use Socket\Raw\Factory;
@ -160,12 +161,13 @@ class ZoneTcpClient
/**
* @use 响应数据
* @param $msg
* @param $type
* @return bool
*/
private static function onMessage($msg)
private static function onMessage($msg, $type)
{
// 心跳后回复人气
if (strlen($msg) == 4) {
if ($type == 3) {
// $num = unpack('N', $msg)[1];
// Log::info("当前直播间现有 {$num} 人聚众搞基!");
return false;
@ -335,8 +337,8 @@ class ZoneTcpClient
*/
private static function unPackMsg($value)
{
$res = unpack('N', $value);
return $res[1] - 16;
$res = unpack('Npacklen/nheadlen/nver/Nop/Nseq', $value);
return $res;
}
@ -360,25 +362,45 @@ class ZoneTcpClient
/**
* @use 读数据
* @param $length
* @param $is_header
* @return array|bool|false
*/
private static function reader($length)
private static function reader($length, $is_header = false)
{
$data = false;
try {
while (self::$client->selectRead(self::$socket_timeout)) {
$data = self::$client->read($length);
if (!$data) {
throw new Exception("Connection failure");
if (self::$client->selectRead(self::$socket_timeout)) {
$ret = 0;
$socket = self::$client->getResource();
while ($length) {
$cnt = 0;
$r = array($socket);
$w = NULL;
$e = NULL;
while ($cnt++ < 60) {
$ret = socket_select($r, $w, $e, 1);
if ($ret === false)
throw new Exception("Connection failure");
if ($ret)
break;
}
$ret = socket_recv($socket, $buffer, $length, 0);
if ($ret < 1) {
print_r("Socket error: [{$ret}] [{$length}]" . PHP_EOL);
throw new Exception("Connection failure");
}
$data .= $buffer;
unset($buffer);
$length -= $ret;
}
if ($length == 16) $data = self::unPackMsg($data);
break;
if ($is_header) $data = self::unPackMsg($data);
}
} catch (Exception $exception) {
self::triggerReConnect([
'area_id' => self::$area_id,
'wait_time' => time() + 60
]);
$data = false;
}
return $data;
}
@ -415,14 +437,46 @@ class ZoneTcpClient
self::$client = $client_info['client'];
self::$area_id = $client_info['area_id'];
self::$room_id = $client_info['room_id'];
$len_body = self::reader(16);
if (!$len_body) {
$head = self::reader(16, true);
if (!$head) {
// 长度为0 ,空信息
continue;
}
$length = isset($head['packlen']) ? $head['packlen'] : 16;
$type = isset($head['op']) ? $head['op'] : 0x0000;
$len_body = $length - 16;
Log::debug("(len=$len_body)");
if (!$len_body)
continue;
$body = self::reader($len_body);
self::onMessage($body);
if ($body)
self::onMessage($body, $type);
}
}
}
/*
* @use replace delay by select
*/
public static function Delayed()
{
$r = [];
$w = NULL;
$e = NULL;
$delay = 0;
if (self::getLock() > time())
return new Delayed(1000);
try {
foreach (self::$client_maps as $client_info) {
if ($client_info['client'])
$r[] = $client_info['client']->getResource();
}
if (count($r) !== 0)
socket_select($r, $w, $e, 1);
else
$delay = 50;
} catch (Exception $exception) {
$delay = 1000;
}
return new Delayed($delay);
}
}

View File

@ -10,6 +10,7 @@
namespace BiliHelper\Util;
use Amp\Delayed;
trait TimeLock
{
@ -33,6 +34,15 @@ trait TimeLock
return static::$lock;
}
/**
* @use used in Amp loop Delayed
* @return delayed
*/
public static function Delayed()
{
return new Delayed(1000);
}
/**
* @use 定时
* @param int $hour
@ -44,4 +54,4 @@ trait TimeLock
return strtotime('tomorrow') + ($hour * 60 * 60) - time();
}
}
}