From cf379002a48ee089bf74a5f7bda3137359898cd4 Mon Sep 17 00:00:00 2001 From: alfred Date: Fri, 20 Mar 2020 22:22:35 +0800 Subject: [PATCH] Use socket_select to improve socket reader efficiency Fix:(When using ZoneTcpClient) 1. Many packets are stuck in receiving queue, cannot be processed in time; On Linux, use below command to check Recv-Q: ss -ntp dport = 2243 2. Above issue will cause websocket to be dropped from time to time. 3. Improve websocket message parsing, so we won't see unexpected messages; --- src/core/App.php | 4 +- src/plugin/ZoneTcpClient.php | 84 +++++++++++++++++++++++++++++------- src/util/TimeLock.php | 12 +++++- 3 files changed, 82 insertions(+), 18 deletions(-) diff --git a/src/core/App.php b/src/core/App.php index 0f7f4dc..ac79de4 100644 --- a/src/core/App.php +++ b/src/core/App.php @@ -61,7 +61,7 @@ class App Log::error($error_msg); // Notice::push('error', $error_msg); } - yield new Delayed(1000); + yield call_user_func(array('BiliHelper\Plugin\\' . $taskName, 'Delayed'), []); } }); } @@ -102,4 +102,4 @@ class App } Loop::run(); } -} \ No newline at end of file +} diff --git a/src/plugin/ZoneTcpClient.php b/src/plugin/ZoneTcpClient.php index 9992c25..f6ae57a 100644 --- a/src/plugin/ZoneTcpClient.php +++ b/src/plugin/ZoneTcpClient.php @@ -14,6 +14,7 @@ use BiliHelper\Core\Log; use BiliHelper\Core\Curl; use BiliHelper\Util\TimeLock; +use Amp\Delayed; use Exception; use Socket\Raw\Factory; @@ -160,12 +161,13 @@ class ZoneTcpClient /** * @use 响应数据 * @param $msg + * @param $type * @return bool */ - private static function onMessage($msg) + private static function onMessage($msg, $type) { // 心跳后回复人气 - if (strlen($msg) == 4) { + if ($type == 3) { // $num = unpack('N', $msg)[1]; // Log::info("当前直播间现有 {$num} 人聚众搞基!"); return false; @@ -335,8 +337,8 @@ class ZoneTcpClient */ private static function unPackMsg($value) { - $res = unpack('N', $value); - return $res[1] - 16; + $res = unpack('Npacklen/nheadlen/nver/Nop/Nseq', $value); + return $res; } @@ -360,25 +362,45 @@ class ZoneTcpClient /** * @use 读数据 * @param $length + * @param $is_header * @return array|bool|false */ - private static function reader($length) + private static function reader($length, $is_header = false) { $data = false; try { - while (self::$client->selectRead(self::$socket_timeout)) { - $data = self::$client->read($length); - if (!$data) { - throw new Exception("Connection failure"); + if (self::$client->selectRead(self::$socket_timeout)) { + $ret = 0; + $socket = self::$client->getResource(); + while ($length) { + $cnt = 0; + $r = array($socket); + $w = NULL; + $e = NULL; + while ($cnt++ < 60) { + $ret = socket_select($r, $w, $e, 1); + if ($ret === false) + throw new Exception("Connection failure"); + if ($ret) + break; + } + $ret = socket_recv($socket, $buffer, $length, 0); + if ($ret < 1) { + print_r("Socket error: [{$ret}] [{$length}]" . PHP_EOL); + throw new Exception("Connection failure"); + } + $data .= $buffer; + unset($buffer); + $length -= $ret; } - if ($length == 16) $data = self::unPackMsg($data); - break; + if ($is_header) $data = self::unPackMsg($data); } } catch (Exception $exception) { self::triggerReConnect([ 'area_id' => self::$area_id, 'wait_time' => time() + 60 ]); + $data = false; } return $data; } @@ -415,14 +437,46 @@ class ZoneTcpClient self::$client = $client_info['client']; self::$area_id = $client_info['area_id']; self::$room_id = $client_info['room_id']; - $len_body = self::reader(16); - if (!$len_body) { + $head = self::reader(16, true); + if (!$head) { // 长度为0 ,空信息 continue; } + $length = isset($head['packlen']) ? $head['packlen'] : 16; + $type = isset($head['op']) ? $head['op'] : 0x0000; + $len_body = $length - 16; Log::debug("(len=$len_body)"); + if (!$len_body) + continue; $body = self::reader($len_body); - self::onMessage($body); + if ($body) + self::onMessage($body, $type); } } -} \ No newline at end of file + + /* + * @use replace delay by select + */ + public static function Delayed() + { + $r = []; + $w = NULL; + $e = NULL; + $delay = 0; + if (self::getLock() > time()) + return new Delayed(1000); + try { + foreach (self::$client_maps as $client_info) { + if ($client_info['client']) + $r[] = $client_info['client']->getResource(); + } + if (count($r) !== 0) + socket_select($r, $w, $e, 1); + else + $delay = 50; + } catch (Exception $exception) { + $delay = 1000; + } + return new Delayed($delay); + } +} diff --git a/src/util/TimeLock.php b/src/util/TimeLock.php index 42a077d..22877ea 100644 --- a/src/util/TimeLock.php +++ b/src/util/TimeLock.php @@ -10,6 +10,7 @@ namespace BiliHelper\Util; +use Amp\Delayed; trait TimeLock { @@ -33,6 +34,15 @@ trait TimeLock return static::$lock; } + /** + * @use used in Amp loop Delayed + * @return delayed + */ + public static function Delayed() + { + return new Delayed(1000); + } + /** * @use 定时 * @param int $hour @@ -44,4 +54,4 @@ trait TimeLock return strtotime('tomorrow') + ($hour * 60 * 60) - time(); } -} \ No newline at end of file +}