mihomo/transport/anytls/session/stream.go
anytls e28c8e6a51
Some checks are pending
Test / test (1.20, macos-13) (push) Waiting to run
Test / test (1.20, macos-latest) (push) Waiting to run
Test / test (1.20, ubuntu-24.04-arm) (push) Waiting to run
Test / test (1.20, ubuntu-latest) (push) Waiting to run
Test / test (1.20, windows-latest) (push) Waiting to run
Test / test (1.21, macos-13) (push) Waiting to run
Test / test (1.21, macos-latest) (push) Waiting to run
Test / test (1.21, ubuntu-24.04-arm) (push) Waiting to run
Test / test (1.21, ubuntu-latest) (push) Waiting to run
Test / test (1.21, windows-latest) (push) Waiting to run
Test / test (1.22, macos-13) (push) Waiting to run
Test / test (1.22, macos-latest) (push) Waiting to run
Test / test (1.22, ubuntu-24.04-arm) (push) Waiting to run
Test / test (1.22, ubuntu-latest) (push) Waiting to run
Test / test (1.22, windows-latest) (push) Waiting to run
Test / test (1.23, macos-13) (push) Waiting to run
Test / test (1.23, macos-latest) (push) Waiting to run
Test / test (1.23, ubuntu-24.04-arm) (push) Waiting to run
Test / test (1.23, ubuntu-latest) (push) Waiting to run
Test / test (1.23, windows-latest) (push) Waiting to run
Test / test (1.24, macos-13) (push) Waiting to run
Test / test (1.24, macos-latest) (push) Waiting to run
Test / test (1.24, ubuntu-24.04-arm) (push) Waiting to run
Test / test (1.24, ubuntu-latest) (push) Waiting to run
Test / test (1.24, windows-latest) (push) Waiting to run
Test / test (1.25, macos-13) (push) Waiting to run
Test / test (1.25, macos-latest) (push) Waiting to run
Test / test (1.25, ubuntu-24.04-arm) (push) Waiting to run
Test / test (1.25, ubuntu-latest) (push) Waiting to run
Test / test (1.25, windows-latest) (push) Waiting to run
Trigger CMFA Update / trigger-CMFA-update (push) Waiting to run
chore: sync anytls v0.0.11 (#2276)
Co-authored-by: anytls <anytls>
2025-09-22 17:29:32 +08:00

165 lines
3.1 KiB
Go

package session
import (
"io"
"net"
"os"
"sync"
"time"
"github.com/metacubex/mihomo/transport/anytls/pipe"
)
// Stream implements net.Conn
type Stream struct {
id uint32
sess *Session
pipeR *pipe.PipeReader
pipeW *pipe.PipeWriter
writeDeadline pipe.PipeDeadline
dieOnce sync.Once
dieHook func()
dieErr error
reportOnce sync.Once
}
// newStream initiates a Stream struct
func newStream(id uint32, sess *Session) *Stream {
s := new(Stream)
s.id = id
s.sess = sess
s.pipeR, s.pipeW = pipe.Pipe()
s.writeDeadline = pipe.MakePipeDeadline()
return s
}
// Read implements net.Conn
func (s *Stream) Read(b []byte) (n int, err error) {
n, err = s.pipeR.Read(b)
if n == 0 && s.dieErr != nil {
err = s.dieErr
}
return
}
// Write implements net.Conn
func (s *Stream) Write(b []byte) (n int, err error) {
select {
case <-s.writeDeadline.Wait():
return 0, os.ErrDeadlineExceeded
default:
}
if s.dieErr != nil {
return 0, s.dieErr
}
n, err = s.sess.writeDataFrame(s.id, b)
return
}
// Close implements net.Conn
func (s *Stream) Close() error {
return s.closeWithError(io.ErrClosedPipe)
}
// closeLocally only closes Stream and don't notify remote peer
func (s *Stream) closeLocally() {
var once bool
s.dieOnce.Do(func() {
s.dieErr = net.ErrClosed
s.pipeR.Close()
once = true
})
if once {
if s.dieHook != nil {
s.dieHook()
s.dieHook = nil
}
}
}
func (s *Stream) closeWithError(err error) error {
var once bool
s.dieOnce.Do(func() {
s.dieErr = err
s.pipeR.Close()
once = true
})
if once {
if s.dieHook != nil {
s.dieHook()
s.dieHook = nil
}
return s.sess.streamClosed(s.id)
} else {
return s.dieErr
}
}
func (s *Stream) SetReadDeadline(t time.Time) error {
return s.pipeR.SetReadDeadline(t)
}
func (s *Stream) SetWriteDeadline(t time.Time) error {
s.writeDeadline.Set(t)
return nil
}
func (s *Stream) SetDeadline(t time.Time) error {
s.SetWriteDeadline(t)
return s.SetReadDeadline(t)
}
// LocalAddr satisfies net.Conn interface
func (s *Stream) LocalAddr() net.Addr {
if ts, ok := s.sess.conn.(interface {
LocalAddr() net.Addr
}); ok {
return ts.LocalAddr()
}
return nil
}
// RemoteAddr satisfies net.Conn interface
func (s *Stream) RemoteAddr() net.Addr {
if ts, ok := s.sess.conn.(interface {
RemoteAddr() net.Addr
}); ok {
return ts.RemoteAddr()
}
return nil
}
// HandshakeFailure should be called when Server fail to create outbound proxy
func (s *Stream) HandshakeFailure(err error) error {
var once bool
s.reportOnce.Do(func() {
once = true
})
if once && err != nil && s.sess.peerVersion >= 2 {
f := newFrame(cmdSYNACK, s.id)
f.data = []byte(err.Error())
if _, err := s.sess.writeControlFrame(f); err != nil {
return err
}
}
return nil
}
// HandshakeSuccess should be called when Server success to create outbound proxy
func (s *Stream) HandshakeSuccess() error {
var once bool
s.reportOnce.Do(func() {
once = true
})
if once && s.sess.peerVersion >= 2 {
if _, err := s.sess.writeControlFrame(newFrame(cmdSYNACK, s.id)); err != nil {
return err
}
}
return nil
}