chore: sync anytls v0.0.11 (#2276)
Some checks are pending
Test / test (1.20, macos-13) (push) Waiting to run
Test / test (1.20, macos-latest) (push) Waiting to run
Test / test (1.20, ubuntu-24.04-arm) (push) Waiting to run
Test / test (1.20, ubuntu-latest) (push) Waiting to run
Test / test (1.20, windows-latest) (push) Waiting to run
Test / test (1.21, macos-13) (push) Waiting to run
Test / test (1.21, macos-latest) (push) Waiting to run
Test / test (1.21, ubuntu-24.04-arm) (push) Waiting to run
Test / test (1.21, ubuntu-latest) (push) Waiting to run
Test / test (1.21, windows-latest) (push) Waiting to run
Test / test (1.22, macos-13) (push) Waiting to run
Test / test (1.22, macos-latest) (push) Waiting to run
Test / test (1.22, ubuntu-24.04-arm) (push) Waiting to run
Test / test (1.22, ubuntu-latest) (push) Waiting to run
Test / test (1.22, windows-latest) (push) Waiting to run
Test / test (1.23, macos-13) (push) Waiting to run
Test / test (1.23, macos-latest) (push) Waiting to run
Test / test (1.23, ubuntu-24.04-arm) (push) Waiting to run
Test / test (1.23, ubuntu-latest) (push) Waiting to run
Test / test (1.23, windows-latest) (push) Waiting to run
Test / test (1.24, macos-13) (push) Waiting to run
Test / test (1.24, macos-latest) (push) Waiting to run
Test / test (1.24, ubuntu-24.04-arm) (push) Waiting to run
Test / test (1.24, ubuntu-latest) (push) Waiting to run
Test / test (1.24, windows-latest) (push) Waiting to run
Test / test (1.25, macos-13) (push) Waiting to run
Test / test (1.25, macos-latest) (push) Waiting to run
Test / test (1.25, ubuntu-24.04-arm) (push) Waiting to run
Test / test (1.25, ubuntu-latest) (push) Waiting to run
Test / test (1.25, windows-latest) (push) Waiting to run
Trigger CMFA Update / trigger-CMFA-update (push) Waiting to run

Co-authored-by: anytls <anytls>
This commit is contained in:
anytls 2025-09-22 18:29:32 +09:00 committed by GitHub
parent 74a86f147b
commit e28c8e6a51
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 80 additions and 57 deletions

View File

@ -46,7 +46,7 @@ func NewClient(ctx context.Context, config ClientConfig) *Client {
} }
// Initialize the padding state of this client // Initialize the padding state of this client
padding.UpdatePaddingScheme(padding.DefaultPaddingScheme, &c.padding) padding.UpdatePaddingScheme(padding.DefaultPaddingScheme, &c.padding)
c.sessionClient = session.NewClient(ctx, c.CreateOutboundTLSConnection, &c.padding, config.IdleSessionCheckInterval, config.IdleSessionTimeout, config.MinIdleSession) c.sessionClient = session.NewClient(ctx, c.createOutboundTLSConnection, &c.padding, config.IdleSessionCheckInterval, config.IdleSessionTimeout, config.MinIdleSession)
return c return c
} }
@ -63,7 +63,7 @@ func (c *Client) CreateProxy(ctx context.Context, destination M.Socksaddr) (net.
return conn, nil return conn, nil
} }
func (c *Client) CreateOutboundTLSConnection(ctx context.Context) (net.Conn, error) { func (c *Client) createOutboundTLSConnection(ctx context.Context) (net.Conn, error) {
conn, err := c.dialer.DialContext(ctx, N.NetworkTCP, c.server) conn, err := c.dialer.DialContext(ctx, N.NetworkTCP, c.server)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -66,23 +66,21 @@ func (c *Client) CreateStream(ctx context.Context) (net.Conn, error) {
var stream *Stream var stream *Stream
var err error var err error
for i := 0; i < 3; i++ { session = c.getIdleSession()
session, err = c.findSession(ctx) if session == nil {
session, err = c.createSession(ctx)
}
if session == nil { if session == nil {
return nil, fmt.Errorf("failed to create session: %w", err) return nil, fmt.Errorf("failed to create session: %w", err)
} }
stream, err = session.OpenStream() stream, err = session.OpenStream()
if err != nil { if err != nil {
_ = session.Close() session.Close()
continue return nil, fmt.Errorf("failed to create stream: %w", err)
}
break
}
if session == nil || stream == nil {
return nil, fmt.Errorf("too many closed session: %w", err)
} }
stream.dieHook = func() { stream.dieHook = func() {
// If Session is not closed, put this Stream to pool
if !session.IsClosed() { if !session.IsClosed() {
select { select {
case <-c.die.Done(): case <-c.die.Done():
@ -100,9 +98,7 @@ func (c *Client) CreateStream(ctx context.Context) (net.Conn, error) {
return stream, nil return stream, nil
} }
func (c *Client) findSession(ctx context.Context) (*Session, error) { func (c *Client) getIdleSession() (idle *Session) {
var idle *Session
c.idleSessionLock.Lock() c.idleSessionLock.Lock()
if !c.idleSession.IsEmpty() { if !c.idleSession.IsEmpty() {
it := c.idleSession.Iterate() it := c.idleSession.Iterate()
@ -110,12 +106,7 @@ func (c *Client) findSession(ctx context.Context) (*Session, error) {
c.idleSession.Remove(it.Key()) c.idleSession.Remove(it.Key())
} }
c.idleSessionLock.Unlock() c.idleSessionLock.Unlock()
return
if idle == nil {
s, err := c.createSession(ctx)
return s, err
}
return idle, nil
} }
func (c *Client) createSession(ctx context.Context) (*Session, error) { func (c *Client) createSession(ctx context.Context) (*Session, error) {
@ -127,7 +118,6 @@ func (c *Client) createSession(ctx context.Context) (*Session, error) {
session := NewClientSession(underlying, c.padding) session := NewClientSession(underlying, c.padding)
session.seq = c.sessionCounter.Add(1) session.seq = c.sessionCounter.Add(1)
session.dieHook = func() { session.dieHook = func() {
//logrus.Debugln("session died", session)
c.idleSessionLock.Lock() c.idleSessionLock.Lock()
c.idleSession.Remove(math.MaxUint64 - session.seq) c.idleSession.Remove(math.MaxUint64 - session.seq)
c.idleSessionLock.Unlock() c.idleSessionLock.Unlock()
@ -168,12 +158,11 @@ func (c *Client) idleCleanup() {
} }
func (c *Client) idleCleanupExpTime(expTime time.Time) { func (c *Client) idleCleanupExpTime(expTime time.Time) {
sessionToRemove := make([]*Session, 0, c.idleSession.Len()) activeCount := 0
sessionToClose := make([]*Session, 0, c.idleSession.Len())
c.idleSessionLock.Lock() c.idleSessionLock.Lock()
it := c.idleSession.Iterate() it := c.idleSession.Iterate()
activeCount := 0
for it.IsNotEnd() { for it.IsNotEnd() {
session := it.Value() session := it.Value()
key := it.Key() key := it.Key()
@ -190,12 +179,12 @@ func (c *Client) idleCleanupExpTime(expTime time.Time) {
continue continue
} }
sessionToRemove = append(sessionToRemove, session) sessionToClose = append(sessionToClose, session)
c.idleSession.Remove(key) c.idleSession.Remove(key)
} }
c.idleSessionLock.Unlock() c.idleSessionLock.Unlock()
for _, session := range sessionToRemove { for _, session := range sessionToClose {
session.Close() session.Close()
} }
} }

View File

@ -90,7 +90,7 @@ func (s *Session) Run() {
f := newFrame(cmdSettings, 0) f := newFrame(cmdSettings, 0)
f.data = settings.ToBytes() f.data = settings.ToBytes()
s.buffering = true s.buffering = true
s.writeFrame(f) s.writeControlFrame(f)
go s.recvLoop() go s.recvLoop()
} }
@ -119,7 +119,7 @@ func (s *Session) Close() error {
} }
s.streamLock.Lock() s.streamLock.Lock()
for _, stream := range s.streams { for _, stream := range s.streams {
stream.Close() stream.closeLocally()
} }
s.streams = make(map[uint32]*Stream) s.streams = make(map[uint32]*Stream)
s.streamLock.Unlock() s.streamLock.Unlock()
@ -138,8 +138,6 @@ func (s *Session) OpenStream() (*Stream, error) {
sid := s.streamId.Add(1) sid := s.streamId.Add(1)
stream := newStream(sid, s) stream := newStream(sid, s)
//logrus.Debugln("stream open", sid, s.streams)
if sid >= 2 && s.peerVersion >= 2 { if sid >= 2 && s.peerVersion >= 2 {
s.synDoneLock.Lock() s.synDoneLock.Lock()
if s.synDone != nil { if s.synDone != nil {
@ -151,7 +149,7 @@ func (s *Session) OpenStream() (*Stream, error) {
s.synDoneLock.Unlock() s.synDoneLock.Unlock()
} }
if _, err := s.writeFrame(newFrame(cmdSYN, sid)); err != nil { if _, err := s.writeControlFrame(newFrame(cmdSYN, sid)); err != nil {
return nil, err return nil, err
} }
@ -207,7 +205,7 @@ func (s *Session) recvLoop() error {
if !s.isClient && !receivedSettingsFromClient { if !s.isClient && !receivedSettingsFromClient {
f := newFrame(cmdAlert, 0) f := newFrame(cmdAlert, 0)
f.data = []byte("client did not send its settings") f.data = []byte("client did not send its settings")
s.writeFrame(f) s.writeControlFrame(f)
return nil return nil
} }
s.streamLock.Lock() s.streamLock.Lock()
@ -241,18 +239,18 @@ func (s *Session) recvLoop() error {
stream, ok := s.streams[sid] stream, ok := s.streams[sid]
s.streamLock.RUnlock() s.streamLock.RUnlock()
if ok { if ok {
stream.CloseWithError(fmt.Errorf("remote: %s", string(buffer))) stream.closeWithError(fmt.Errorf("remote: %s", string(buffer)))
} }
pool.Put(buffer) pool.Put(buffer)
} }
case cmdFIN: case cmdFIN:
s.streamLock.RLock() s.streamLock.Lock()
stream, ok := s.streams[sid] stream, ok := s.streams[sid]
s.streamLock.RUnlock() delete(s.streams, sid)
s.streamLock.Unlock()
if ok { if ok {
stream.Close() stream.closeLocally()
} }
//logrus.Debugln("stream fin", sid, s.streams)
case cmdWaste: case cmdWaste:
if hdr.Length() > 0 { if hdr.Length() > 0 {
buffer := pool.Get(int(hdr.Length())) buffer := pool.Get(int(hdr.Length()))
@ -274,10 +272,9 @@ func (s *Session) recvLoop() error {
m := util.StringMapFromBytes(buffer) m := util.StringMapFromBytes(buffer)
paddingF := s.padding.Load() paddingF := s.padding.Load()
if m["padding-md5"] != paddingF.Md5 { if m["padding-md5"] != paddingF.Md5 {
// logrus.Debugln("remote md5 is", m["padding-md5"])
f := newFrame(cmdUpdatePaddingScheme, 0) f := newFrame(cmdUpdatePaddingScheme, 0)
f.data = paddingF.RawScheme f.data = paddingF.RawScheme
_, err = s.writeFrame(f) _, err = s.writeControlFrame(f)
if err != nil { if err != nil {
pool.Put(buffer) pool.Put(buffer)
return err return err
@ -291,7 +288,7 @@ func (s *Session) recvLoop() error {
f.data = util.StringMap{ f.data = util.StringMap{
"v": "2", "v": "2",
}.ToBytes() }.ToBytes()
_, err = s.writeFrame(f) _, err = s.writeControlFrame(f)
if err != nil { if err != nil {
pool.Put(buffer) pool.Put(buffer)
return err return err
@ -329,7 +326,7 @@ func (s *Session) recvLoop() error {
} }
} }
case cmdHeartRequest: case cmdHeartRequest:
if _, err := s.writeFrame(newFrame(cmdHeartResponse, sid)); err != nil { if _, err := s.writeControlFrame(newFrame(cmdHeartResponse, sid)); err != nil {
return err return err
} }
case cmdHeartResponse: case cmdHeartResponse:
@ -364,14 +361,31 @@ func (s *Session) streamClosed(sid uint32) error {
if s.IsClosed() { if s.IsClosed() {
return io.ErrClosedPipe return io.ErrClosedPipe
} }
_, err := s.writeFrame(newFrame(cmdFIN, sid)) _, err := s.writeControlFrame(newFrame(cmdFIN, sid))
s.streamLock.Lock() s.streamLock.Lock()
delete(s.streams, sid) delete(s.streams, sid)
s.streamLock.Unlock() s.streamLock.Unlock()
return err return err
} }
func (s *Session) writeFrame(frame frame) (int, error) { func (s *Session) writeDataFrame(sid uint32, data []byte) (int, error) {
dataLen := len(data)
buffer := buf.NewSize(dataLen + headerOverHeadSize)
buffer.WriteByte(cmdPSH)
binary.BigEndian.PutUint32(buffer.Extend(4), sid)
binary.BigEndian.PutUint16(buffer.Extend(2), uint16(dataLen))
buffer.Write(data)
_, err := s.writeConn(buffer.Bytes())
buffer.Release()
if err != nil {
return 0, err
}
return dataLen, nil
}
func (s *Session) writeControlFrame(frame frame) (int, error) {
dataLen := len(frame.data) dataLen := len(frame.data)
buffer := buf.NewSize(dataLen + headerOverHeadSize) buffer := buf.NewSize(dataLen + headerOverHeadSize)
@ -379,12 +393,18 @@ func (s *Session) writeFrame(frame frame) (int, error) {
binary.BigEndian.PutUint32(buffer.Extend(4), frame.sid) binary.BigEndian.PutUint32(buffer.Extend(4), frame.sid)
binary.BigEndian.PutUint16(buffer.Extend(2), uint16(dataLen)) binary.BigEndian.PutUint16(buffer.Extend(2), uint16(dataLen))
buffer.Write(frame.data) buffer.Write(frame.data)
s.conn.SetWriteDeadline(time.Now().Add(time.Second * 5))
_, err := s.writeConn(buffer.Bytes()) _, err := s.writeConn(buffer.Bytes())
buffer.Release() buffer.Release()
if err != nil { if err != nil {
s.Close()
return 0, err return 0, err
} }
s.conn.SetWriteDeadline(time.Time{})
return dataLen, nil return dataLen, nil
} }

View File

@ -53,21 +53,35 @@ func (s *Stream) Write(b []byte) (n int, err error) {
return 0, os.ErrDeadlineExceeded return 0, os.ErrDeadlineExceeded
default: default:
} }
f := newFrame(cmdPSH, s.id) if s.dieErr != nil {
f.data = b return 0, s.dieErr
n, err = s.sess.writeFrame(f) }
n, err = s.sess.writeDataFrame(s.id, b)
return return
} }
// Close implements net.Conn // Close implements net.Conn
func (s *Stream) Close() error { func (s *Stream) Close() error {
return s.CloseWithError(io.ErrClosedPipe) return s.closeWithError(io.ErrClosedPipe)
} }
func (s *Stream) CloseWithError(err error) error { // closeLocally only closes Stream and don't notify remote peer
// if err != io.ErrClosedPipe { func (s *Stream) closeLocally() {
// logrus.Debugln(err) var once bool
// } s.dieOnce.Do(func() {
s.dieErr = net.ErrClosed
s.pipeR.Close()
once = true
})
if once {
if s.dieHook != nil {
s.dieHook()
s.dieHook = nil
}
}
}
func (s *Stream) closeWithError(err error) error {
var once bool var once bool
s.dieOnce.Do(func() { s.dieOnce.Do(func() {
s.dieErr = err s.dieErr = err
@ -128,7 +142,7 @@ func (s *Stream) HandshakeFailure(err error) error {
if once && err != nil && s.sess.peerVersion >= 2 { if once && err != nil && s.sess.peerVersion >= 2 {
f := newFrame(cmdSYNACK, s.id) f := newFrame(cmdSYNACK, s.id)
f.data = []byte(err.Error()) f.data = []byte(err.Error())
if _, err := s.sess.writeFrame(f); err != nil { if _, err := s.sess.writeControlFrame(f); err != nil {
return err return err
} }
} }
@ -142,7 +156,7 @@ func (s *Stream) HandshakeSuccess() error {
once = true once = true
}) })
if once && s.sess.peerVersion >= 2 { if once && s.sess.peerVersion >= 2 {
if _, err := s.sess.writeFrame(newFrame(cmdSYNACK, s.id)); err != nil { if _, err := s.sess.writeControlFrame(newFrame(cmdSYNACK, s.id)); err != nil {
return err return err
} }
} }