diff --git a/adapter/outbound/mieru.go b/adapter/outbound/mieru.go index 8ef9cfd7..d9feeba0 100644 --- a/adapter/outbound/mieru.go +++ b/adapter/outbound/mieru.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net" + "net/netip" "strconv" "sync" @@ -40,6 +41,20 @@ type MieruOption struct { HandshakeMode string `proxy:"handshake-mode,omitempty"` } +type mieruPacketDialer struct { + C.Dialer +} + +var _ mierucommon.PacketDialer = (*mieruPacketDialer)(nil) + +func (pd mieruPacketDialer) ListenPacket(ctx context.Context, network, laddr, raddr string) (net.PacketConn, error) { + rAddrPort, err := netip.ParseAddrPort(raddr) + if err != nil { + return nil, fmt.Errorf("invalid address %s: %w", raddr, err) + } + return pd.Dialer.ListenPacket(ctx, network, laddr, rAddrPort) +} + // DialContext implements C.ProxyAdapter func (m *Mieru) DialContext(ctx context.Context, metadata *C.Metadata) (C.Conn, error) { if err := m.ensureClientIsRunning(); err != nil { @@ -102,6 +117,7 @@ func (m *Mieru) ensureClientIsRunning() error { return err } config.Dialer = dialer + config.PacketDialer = mieruPacketDialer{Dialer: dialer} if err := m.client.Store(config); err != nil { return err } @@ -158,23 +174,21 @@ func (m *Mieru) Close() error { } func metadataToMieruNetAddrSpec(metadata *C.Metadata) mierumodel.NetAddrSpec { + spec := mierumodel.NetAddrSpec{ + Net: metadata.NetWork.String(), + } if metadata.Host != "" { - return mierumodel.NetAddrSpec{ - AddrSpec: mierumodel.AddrSpec{ - FQDN: metadata.Host, - Port: int(metadata.DstPort), - }, - Net: "tcp", + spec.AddrSpec = mierumodel.AddrSpec{ + FQDN: metadata.Host, + Port: int(metadata.DstPort), } } else { - return mierumodel.NetAddrSpec{ - AddrSpec: mierumodel.AddrSpec{ - IP: metadata.DstIP.AsSlice(), - Port: int(metadata.DstPort), - }, - Net: "tcp", + spec.AddrSpec = mierumodel.AddrSpec{ + IP: metadata.DstIP.AsSlice(), + Port: int(metadata.DstPort), } } + return spec } func buildMieruClientConfig(option MieruOption) (*mieruclient.ClientConfig, error) { @@ -182,7 +196,13 @@ func buildMieruClientConfig(option MieruOption) (*mieruclient.ClientConfig, erro return nil, fmt.Errorf("failed to validate mieru option: %w", err) } - transportProtocol := mierupb.TransportProtocol_TCP.Enum() + var transportProtocol = mierupb.TransportProtocol_UNKNOWN_TRANSPORT_PROTOCOL.Enum() + switch option.Transport { + case "TCP": + transportProtocol = mierupb.TransportProtocol_TCP.Enum() + case "UDP": + transportProtocol = mierupb.TransportProtocol_UDP.Enum() + } var server *mierupb.ServerEndpoint if net.ParseIP(option.Server) != nil { // server is an IP address @@ -284,8 +304,8 @@ func validateMieruOption(option MieruOption) error { } } - if option.Transport != "TCP" { - return fmt.Errorf("transport must be TCP") + if option.Transport != "TCP" && option.Transport != "UDP" { + return fmt.Errorf("transport must be TCP or UDP") } if option.UserName == "" { return fmt.Errorf("username is empty") diff --git a/adapter/outbound/mieru_test.go b/adapter/outbound/mieru_test.go index 086b7910..d80b2769 100644 --- a/adapter/outbound/mieru_test.go +++ b/adapter/outbound/mieru_test.go @@ -34,7 +34,7 @@ func TestNewMieru(t *testing.T) { Name: "test", Server: "example.com", Port: 10003, - Transport: "TCP", + Transport: "UDP", UserName: "test", Password: "test", }, diff --git a/docs/config.yaml b/docs/config.yaml index e71b06f4..8a5ec1f7 100644 --- a/docs/config.yaml +++ b/docs/config.yaml @@ -1027,7 +1027,7 @@ proxies: # socks5 server: 1.2.3.4 port: 2999 # port-range: 2090-2099 #(不可同时填写 port 和 port-range) - transport: TCP # 只支持 TCP + transport: TCP # 支持 TCP 或者 UDP udp: true # 支持 UDP over TCP username: user password: password diff --git a/go.mod b/go.mod index 90362be0..2047d0eb 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/bahlo/generic-list-go v0.2.0 github.com/coreos/go-iptables v0.8.0 github.com/dlclark/regexp2 v1.11.5 - github.com/enfein/mieru/v3 v3.22.1 + github.com/enfein/mieru/v3 v3.23.0 github.com/go-chi/chi/v5 v5.2.3 github.com/go-chi/render v1.0.3 github.com/gobwas/ws v1.4.0 diff --git a/go.sum b/go.sum index 0bec3612..bc29d7fe 100644 --- a/go.sum +++ b/go.sum @@ -23,8 +23,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dlclark/regexp2 v1.11.5 h1:Q/sSnsKerHeCkc/jSTNq1oCm7KiVgUMZRDUoRu0JQZQ= github.com/dlclark/regexp2 v1.11.5/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= -github.com/enfein/mieru/v3 v3.22.1 h1:/XGYYXpEhEJlxosmtbpEJkhtRLHB8IToG7LB8kU2ZDY= -github.com/enfein/mieru/v3 v3.22.1/go.mod h1:zJBUCsi5rxyvHM8fjFf+GLaEl4OEjjBXr1s5F6Qd3hM= +github.com/enfein/mieru/v3 v3.23.0 h1:f/dd3UAoi36FD9DZ9x49t6Ps0oHeSjrVSgWzvEstn0E= +github.com/enfein/mieru/v3 v3.23.0/go.mod h1:zJBUCsi5rxyvHM8fjFf+GLaEl4OEjjBXr1s5F6Qd3hM= github.com/ericlagergren/aegis v0.0.0-20250325060835-cd0defd64358 h1:kXYqH/sL8dS/FdoFjr12ePjnLPorPo2FsnrHNuXSDyo= github.com/ericlagergren/aegis v0.0.0-20250325060835-cd0defd64358/go.mod h1:hkIFzoiIPZYxdFOOLyDho59b7SrDfo+w3h+yWdlg45I= github.com/ericlagergren/polyval v0.0.0-20220411101811-e25bc10ba391 h1:8j2RH289RJplhA6WfdaPqzg1MjH2K8wX5e0uhAxrw2g= diff --git a/listener/inbound/common_test.go b/listener/inbound/common_test.go index 72874c7b..4178035b 100644 --- a/listener/inbound/common_test.go +++ b/listener/inbound/common_test.go @@ -59,11 +59,13 @@ func init() { } type TestTunnel struct { - HandleTCPConnFn func(conn net.Conn, metadata *C.Metadata) - HandleUDPPacketFn func(packet C.UDPPacket, metadata *C.Metadata) - NatTableFn func() C.NatTable - CloseFn func() error - DoTestFn func(t *testing.T, proxy C.ProxyAdapter) + HandleTCPConnFn func(conn net.Conn, metadata *C.Metadata) + HandleUDPPacketFn func(packet C.UDPPacket, metadata *C.Metadata) + NatTableFn func() C.NatTable + CloseFn func() error + DoTestFn func(t *testing.T, proxy C.ProxyAdapter) + DoSequentialTestFn func(t *testing.T, proxy C.ProxyAdapter) + DoConcurrentTestFn func(t *testing.T, proxy C.ProxyAdapter) } func (tt *TestTunnel) HandleTCPConn(conn net.Conn, metadata *C.Metadata) { @@ -86,6 +88,14 @@ func (tt *TestTunnel) DoTest(t *testing.T, proxy C.ProxyAdapter) { tt.DoTestFn(t, proxy) } +func (tt *TestTunnel) DoSequentialTest(t *testing.T, proxy C.ProxyAdapter) { + tt.DoSequentialTestFn(t, proxy) +} + +func (tt *TestTunnel) DoConcurrentTest(t *testing.T, proxy C.ProxyAdapter) { + tt.DoConcurrentTestFn(t, proxy) +} + type TestTunnelListener struct { ch chan net.Conn ctx context.Context @@ -213,6 +223,40 @@ func NewHttpTestTunnel() *TestTunnel { } assert.Equal(t, httpData[:size], data) } + + sequentialTestFn := func(t *testing.T, proxy C.ProxyAdapter) { + // Sequential testing for debugging + t.Run("Sequential", func(t *testing.T) { + testFn(t, proxy, "http", len(httpData)) + testFn(t, proxy, "https", len(httpData)) + }) + } + + concurrentTestFn := func(t *testing.T, proxy C.ProxyAdapter) { + // Concurrent testing to detect stress + t.Run("Concurrent", func(t *testing.T) { + wg := sync.WaitGroup{} + num := len(httpData) / 1024 + for i := 1; i <= num; i++ { + i := i + wg.Add(1) + go func() { + testFn(t, proxy, "https", i*1024) + defer wg.Done() + }() + } + for i := 1; i <= num; i++ { + i := i + wg.Add(1) + go func() { + testFn(t, proxy, "http", i*1024) + defer wg.Done() + }() + } + wg.Wait() + }) + } + tunnel := &TestTunnel{ HandleTCPConnFn: func(conn net.Conn, metadata *C.Metadata) { defer conn.Close() @@ -252,36 +296,11 @@ func NewHttpTestTunnel() *TestTunnel { }, CloseFn: ln.Close, DoTestFn: func(t *testing.T, proxy C.ProxyAdapter) { - - // Sequential testing for debugging - t.Run("Sequential", func(t *testing.T) { - testFn(t, proxy, "http", len(httpData)) - testFn(t, proxy, "https", len(httpData)) - }) - - // Concurrent testing to detect stress - t.Run("Concurrent", func(t *testing.T) { - wg := sync.WaitGroup{} - num := len(httpData) / 1024 - for i := 1; i <= num; i++ { - i := i - wg.Add(1) - go func() { - testFn(t, proxy, "https", i*1024) - defer wg.Done() - }() - } - for i := 1; i <= num; i++ { - i := i - wg.Add(1) - go func() { - testFn(t, proxy, "http", i*1024) - defer wg.Done() - }() - } - wg.Wait() - }) + sequentialTestFn(t, proxy) + concurrentTestFn(t, proxy) }, + DoSequentialTestFn: sequentialTestFn, + DoConcurrentTestFn: concurrentTestFn, } return tunnel } diff --git a/listener/inbound/mieru_test.go b/listener/inbound/mieru_test.go index d163b49d..12aa680c 100644 --- a/listener/inbound/mieru_test.go +++ b/listener/inbound/mieru_test.go @@ -149,12 +149,18 @@ func TestNewMieru(t *testing.T) { } func TestInboundMieru(t *testing.T) { - t.Run("HANDSHAKE_STANDARD", func(t *testing.T) { + t.Run("TCP_HANDSHAKE_STANDARD", func(t *testing.T) { testInboundMieruTCP(t, "HANDSHAKE_STANDARD") }) - t.Run("HANDSHAKE_NO_WAIT", func(t *testing.T) { + t.Run("TCP_HANDSHAKE_NO_WAIT", func(t *testing.T) { testInboundMieruTCP(t, "HANDSHAKE_NO_WAIT") }) + t.Run("UDP_HANDSHAKE_STANDARD", func(t *testing.T) { + testInboundMieruUDP(t, "HANDSHAKE_STANDARD") + }) + t.Run("UDP_HANDSHAKE_NO_WAIT", func(t *testing.T) { + testInboundMieruUDP(t, "HANDSHAKE_NO_WAIT") + }) } func testInboundMieruTCP(t *testing.T, handshakeMode string) { @@ -168,7 +174,7 @@ func testInboundMieruTCP(t *testing.T, handshakeMode string) { inboundOptions := inbound.MieruOption{ BaseOption: inbound.BaseOption{ - NameStr: "mieru_inbound", + NameStr: "mieru_inbound_tcp", Listen: "127.0.0.1", Port: strconv.Itoa(port), }, @@ -194,7 +200,7 @@ func testInboundMieruTCP(t *testing.T, handshakeMode string) { return } outboundOptions := outbound.MieruOption{ - Name: "mieru_outbound", + Name: "mieru_outbound_tcp", Server: addrPort.Addr().String(), Port: int(addrPort.Port()), Transport: "TCP", @@ -210,3 +216,57 @@ func testInboundMieruTCP(t *testing.T, handshakeMode string) { tunnel.DoTest(t, out) } + +func testInboundMieruUDP(t *testing.T, handshakeMode string) { + t.Parallel() + l, err := net.ListenPacket("udp", "127.0.0.1:0") + if !assert.NoError(t, err) { + return + } + port := l.LocalAddr().(*net.UDPAddr).Port + l.Close() + + inboundOptions := inbound.MieruOption{ + BaseOption: inbound.BaseOption{ + NameStr: "mieru_inbound_udp", + Listen: "127.0.0.1", + Port: strconv.Itoa(port), + }, + Transport: "UDP", + Users: map[string]string{"test": "password"}, + } + in, err := inbound.NewMieru(&inboundOptions) + if !assert.NoError(t, err) { + return + } + + tunnel := NewHttpTestTunnel() + defer tunnel.Close() + + err = in.Listen(tunnel) + if !assert.NoError(t, err) { + return + } + defer in.Close() + + addrPort, err := netip.ParseAddrPort(in.Address()) + if !assert.NoError(t, err) { + return + } + outboundOptions := outbound.MieruOption{ + Name: "mieru_outbound_udp", + Server: addrPort.Addr().String(), + Port: int(addrPort.Port()), + Transport: "UDP", + UserName: "test", + Password: "password", + HandshakeMode: handshakeMode, + } + out, err := outbound.NewMieru(outboundOptions) + if !assert.NoError(t, err) { + return + } + defer out.Close() + + tunnel.DoSequentialTest(t, out) +}