feat: support mieru UDP outbound (#2384)
Some checks are pending
Test / test (1.20, macos-15-intel) (push) Waiting to run
Test / test (1.20, macos-latest) (push) Waiting to run
Test / test (1.20, ubuntu-24.04-arm) (push) Waiting to run
Test / test (1.20, ubuntu-latest) (push) Waiting to run
Test / test (1.20, windows-latest) (push) Waiting to run
Test / test (1.21, macos-15-intel) (push) Waiting to run
Test / test (1.21, macos-latest) (push) Waiting to run
Test / test (1.21, ubuntu-24.04-arm) (push) Waiting to run
Test / test (1.21, ubuntu-latest) (push) Waiting to run
Test / test (1.21, windows-latest) (push) Waiting to run
Test / test (1.22, macos-15-intel) (push) Waiting to run
Test / test (1.22, macos-latest) (push) Waiting to run
Test / test (1.22, ubuntu-24.04-arm) (push) Waiting to run
Test / test (1.22, ubuntu-latest) (push) Waiting to run
Test / test (1.22, windows-latest) (push) Waiting to run
Test / test (1.23, macos-15-intel) (push) Waiting to run
Test / test (1.23, macos-latest) (push) Waiting to run
Test / test (1.23, ubuntu-24.04-arm) (push) Waiting to run
Test / test (1.23, ubuntu-latest) (push) Waiting to run
Test / test (1.23, windows-latest) (push) Waiting to run
Test / test (1.24, macos-15-intel) (push) Waiting to run
Test / test (1.24, macos-latest) (push) Waiting to run
Test / test (1.24, ubuntu-24.04-arm) (push) Waiting to run
Test / test (1.24, ubuntu-latest) (push) Waiting to run
Test / test (1.24, windows-latest) (push) Waiting to run
Test / test (1.25, macos-15-intel) (push) Waiting to run
Test / test (1.25, macos-latest) (push) Waiting to run
Test / test (1.25, ubuntu-24.04-arm) (push) Waiting to run
Test / test (1.25, ubuntu-latest) (push) Waiting to run
Test / test (1.25, windows-latest) (push) Waiting to run
Trigger CMFA Update / trigger-CMFA-update (push) Waiting to run

This commit is contained in:
enfein 2025-11-22 00:54:14 +00:00 committed by GitHub
parent c107c6a824
commit 5aa140c493
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 157 additions and 58 deletions

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net"
"net/netip"
"strconv"
"sync"
@ -40,6 +41,20 @@ type MieruOption struct {
HandshakeMode string `proxy:"handshake-mode,omitempty"`
}
type mieruPacketDialer struct {
C.Dialer
}
var _ mierucommon.PacketDialer = (*mieruPacketDialer)(nil)
func (pd mieruPacketDialer) ListenPacket(ctx context.Context, network, laddr, raddr string) (net.PacketConn, error) {
rAddrPort, err := netip.ParseAddrPort(raddr)
if err != nil {
return nil, fmt.Errorf("invalid address %s: %w", raddr, err)
}
return pd.Dialer.ListenPacket(ctx, network, laddr, rAddrPort)
}
// DialContext implements C.ProxyAdapter
func (m *Mieru) DialContext(ctx context.Context, metadata *C.Metadata) (C.Conn, error) {
if err := m.ensureClientIsRunning(); err != nil {
@ -102,6 +117,7 @@ func (m *Mieru) ensureClientIsRunning() error {
return err
}
config.Dialer = dialer
config.PacketDialer = mieruPacketDialer{Dialer: dialer}
if err := m.client.Store(config); err != nil {
return err
}
@ -158,23 +174,21 @@ func (m *Mieru) Close() error {
}
func metadataToMieruNetAddrSpec(metadata *C.Metadata) mierumodel.NetAddrSpec {
spec := mierumodel.NetAddrSpec{
Net: metadata.NetWork.String(),
}
if metadata.Host != "" {
return mierumodel.NetAddrSpec{
AddrSpec: mierumodel.AddrSpec{
FQDN: metadata.Host,
Port: int(metadata.DstPort),
},
Net: "tcp",
spec.AddrSpec = mierumodel.AddrSpec{
FQDN: metadata.Host,
Port: int(metadata.DstPort),
}
} else {
return mierumodel.NetAddrSpec{
AddrSpec: mierumodel.AddrSpec{
IP: metadata.DstIP.AsSlice(),
Port: int(metadata.DstPort),
},
Net: "tcp",
spec.AddrSpec = mierumodel.AddrSpec{
IP: metadata.DstIP.AsSlice(),
Port: int(metadata.DstPort),
}
}
return spec
}
func buildMieruClientConfig(option MieruOption) (*mieruclient.ClientConfig, error) {
@ -182,7 +196,13 @@ func buildMieruClientConfig(option MieruOption) (*mieruclient.ClientConfig, erro
return nil, fmt.Errorf("failed to validate mieru option: %w", err)
}
transportProtocol := mierupb.TransportProtocol_TCP.Enum()
var transportProtocol = mierupb.TransportProtocol_UNKNOWN_TRANSPORT_PROTOCOL.Enum()
switch option.Transport {
case "TCP":
transportProtocol = mierupb.TransportProtocol_TCP.Enum()
case "UDP":
transportProtocol = mierupb.TransportProtocol_UDP.Enum()
}
var server *mierupb.ServerEndpoint
if net.ParseIP(option.Server) != nil {
// server is an IP address
@ -284,8 +304,8 @@ func validateMieruOption(option MieruOption) error {
}
}
if option.Transport != "TCP" {
return fmt.Errorf("transport must be TCP")
if option.Transport != "TCP" && option.Transport != "UDP" {
return fmt.Errorf("transport must be TCP or UDP")
}
if option.UserName == "" {
return fmt.Errorf("username is empty")

View File

@ -34,7 +34,7 @@ func TestNewMieru(t *testing.T) {
Name: "test",
Server: "example.com",
Port: 10003,
Transport: "TCP",
Transport: "UDP",
UserName: "test",
Password: "test",
},

View File

@ -1027,7 +1027,7 @@ proxies: # socks5
server: 1.2.3.4
port: 2999
# port-range: 2090-2099 #(不可同时填写 port 和 port-range
transport: TCP # 支持 TCP
transport: TCP # 支持 TCP 或者 UDP
udp: true # 支持 UDP over TCP
username: user
password: password

2
go.mod
View File

@ -6,7 +6,7 @@ require (
github.com/bahlo/generic-list-go v0.2.0
github.com/coreos/go-iptables v0.8.0
github.com/dlclark/regexp2 v1.11.5
github.com/enfein/mieru/v3 v3.22.1
github.com/enfein/mieru/v3 v3.23.0
github.com/go-chi/chi/v5 v5.2.3
github.com/go-chi/render v1.0.3
github.com/gobwas/ws v1.4.0

4
go.sum
View File

@ -23,8 +23,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dlclark/regexp2 v1.11.5 h1:Q/sSnsKerHeCkc/jSTNq1oCm7KiVgUMZRDUoRu0JQZQ=
github.com/dlclark/regexp2 v1.11.5/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8=
github.com/enfein/mieru/v3 v3.22.1 h1:/XGYYXpEhEJlxosmtbpEJkhtRLHB8IToG7LB8kU2ZDY=
github.com/enfein/mieru/v3 v3.22.1/go.mod h1:zJBUCsi5rxyvHM8fjFf+GLaEl4OEjjBXr1s5F6Qd3hM=
github.com/enfein/mieru/v3 v3.23.0 h1:f/dd3UAoi36FD9DZ9x49t6Ps0oHeSjrVSgWzvEstn0E=
github.com/enfein/mieru/v3 v3.23.0/go.mod h1:zJBUCsi5rxyvHM8fjFf+GLaEl4OEjjBXr1s5F6Qd3hM=
github.com/ericlagergren/aegis v0.0.0-20250325060835-cd0defd64358 h1:kXYqH/sL8dS/FdoFjr12ePjnLPorPo2FsnrHNuXSDyo=
github.com/ericlagergren/aegis v0.0.0-20250325060835-cd0defd64358/go.mod h1:hkIFzoiIPZYxdFOOLyDho59b7SrDfo+w3h+yWdlg45I=
github.com/ericlagergren/polyval v0.0.0-20220411101811-e25bc10ba391 h1:8j2RH289RJplhA6WfdaPqzg1MjH2K8wX5e0uhAxrw2g=

View File

@ -59,11 +59,13 @@ func init() {
}
type TestTunnel struct {
HandleTCPConnFn func(conn net.Conn, metadata *C.Metadata)
HandleUDPPacketFn func(packet C.UDPPacket, metadata *C.Metadata)
NatTableFn func() C.NatTable
CloseFn func() error
DoTestFn func(t *testing.T, proxy C.ProxyAdapter)
HandleTCPConnFn func(conn net.Conn, metadata *C.Metadata)
HandleUDPPacketFn func(packet C.UDPPacket, metadata *C.Metadata)
NatTableFn func() C.NatTable
CloseFn func() error
DoTestFn func(t *testing.T, proxy C.ProxyAdapter)
DoSequentialTestFn func(t *testing.T, proxy C.ProxyAdapter)
DoConcurrentTestFn func(t *testing.T, proxy C.ProxyAdapter)
}
func (tt *TestTunnel) HandleTCPConn(conn net.Conn, metadata *C.Metadata) {
@ -86,6 +88,14 @@ func (tt *TestTunnel) DoTest(t *testing.T, proxy C.ProxyAdapter) {
tt.DoTestFn(t, proxy)
}
func (tt *TestTunnel) DoSequentialTest(t *testing.T, proxy C.ProxyAdapter) {
tt.DoSequentialTestFn(t, proxy)
}
func (tt *TestTunnel) DoConcurrentTest(t *testing.T, proxy C.ProxyAdapter) {
tt.DoConcurrentTestFn(t, proxy)
}
type TestTunnelListener struct {
ch chan net.Conn
ctx context.Context
@ -213,6 +223,40 @@ func NewHttpTestTunnel() *TestTunnel {
}
assert.Equal(t, httpData[:size], data)
}
sequentialTestFn := func(t *testing.T, proxy C.ProxyAdapter) {
// Sequential testing for debugging
t.Run("Sequential", func(t *testing.T) {
testFn(t, proxy, "http", len(httpData))
testFn(t, proxy, "https", len(httpData))
})
}
concurrentTestFn := func(t *testing.T, proxy C.ProxyAdapter) {
// Concurrent testing to detect stress
t.Run("Concurrent", func(t *testing.T) {
wg := sync.WaitGroup{}
num := len(httpData) / 1024
for i := 1; i <= num; i++ {
i := i
wg.Add(1)
go func() {
testFn(t, proxy, "https", i*1024)
defer wg.Done()
}()
}
for i := 1; i <= num; i++ {
i := i
wg.Add(1)
go func() {
testFn(t, proxy, "http", i*1024)
defer wg.Done()
}()
}
wg.Wait()
})
}
tunnel := &TestTunnel{
HandleTCPConnFn: func(conn net.Conn, metadata *C.Metadata) {
defer conn.Close()
@ -252,36 +296,11 @@ func NewHttpTestTunnel() *TestTunnel {
},
CloseFn: ln.Close,
DoTestFn: func(t *testing.T, proxy C.ProxyAdapter) {
// Sequential testing for debugging
t.Run("Sequential", func(t *testing.T) {
testFn(t, proxy, "http", len(httpData))
testFn(t, proxy, "https", len(httpData))
})
// Concurrent testing to detect stress
t.Run("Concurrent", func(t *testing.T) {
wg := sync.WaitGroup{}
num := len(httpData) / 1024
for i := 1; i <= num; i++ {
i := i
wg.Add(1)
go func() {
testFn(t, proxy, "https", i*1024)
defer wg.Done()
}()
}
for i := 1; i <= num; i++ {
i := i
wg.Add(1)
go func() {
testFn(t, proxy, "http", i*1024)
defer wg.Done()
}()
}
wg.Wait()
})
sequentialTestFn(t, proxy)
concurrentTestFn(t, proxy)
},
DoSequentialTestFn: sequentialTestFn,
DoConcurrentTestFn: concurrentTestFn,
}
return tunnel
}

View File

@ -149,12 +149,18 @@ func TestNewMieru(t *testing.T) {
}
func TestInboundMieru(t *testing.T) {
t.Run("HANDSHAKE_STANDARD", func(t *testing.T) {
t.Run("TCP_HANDSHAKE_STANDARD", func(t *testing.T) {
testInboundMieruTCP(t, "HANDSHAKE_STANDARD")
})
t.Run("HANDSHAKE_NO_WAIT", func(t *testing.T) {
t.Run("TCP_HANDSHAKE_NO_WAIT", func(t *testing.T) {
testInboundMieruTCP(t, "HANDSHAKE_NO_WAIT")
})
t.Run("UDP_HANDSHAKE_STANDARD", func(t *testing.T) {
testInboundMieruUDP(t, "HANDSHAKE_STANDARD")
})
t.Run("UDP_HANDSHAKE_NO_WAIT", func(t *testing.T) {
testInboundMieruUDP(t, "HANDSHAKE_NO_WAIT")
})
}
func testInboundMieruTCP(t *testing.T, handshakeMode string) {
@ -168,7 +174,7 @@ func testInboundMieruTCP(t *testing.T, handshakeMode string) {
inboundOptions := inbound.MieruOption{
BaseOption: inbound.BaseOption{
NameStr: "mieru_inbound",
NameStr: "mieru_inbound_tcp",
Listen: "127.0.0.1",
Port: strconv.Itoa(port),
},
@ -194,7 +200,7 @@ func testInboundMieruTCP(t *testing.T, handshakeMode string) {
return
}
outboundOptions := outbound.MieruOption{
Name: "mieru_outbound",
Name: "mieru_outbound_tcp",
Server: addrPort.Addr().String(),
Port: int(addrPort.Port()),
Transport: "TCP",
@ -210,3 +216,57 @@ func testInboundMieruTCP(t *testing.T, handshakeMode string) {
tunnel.DoTest(t, out)
}
func testInboundMieruUDP(t *testing.T, handshakeMode string) {
t.Parallel()
l, err := net.ListenPacket("udp", "127.0.0.1:0")
if !assert.NoError(t, err) {
return
}
port := l.LocalAddr().(*net.UDPAddr).Port
l.Close()
inboundOptions := inbound.MieruOption{
BaseOption: inbound.BaseOption{
NameStr: "mieru_inbound_udp",
Listen: "127.0.0.1",
Port: strconv.Itoa(port),
},
Transport: "UDP",
Users: map[string]string{"test": "password"},
}
in, err := inbound.NewMieru(&inboundOptions)
if !assert.NoError(t, err) {
return
}
tunnel := NewHttpTestTunnel()
defer tunnel.Close()
err = in.Listen(tunnel)
if !assert.NoError(t, err) {
return
}
defer in.Close()
addrPort, err := netip.ParseAddrPort(in.Address())
if !assert.NoError(t, err) {
return
}
outboundOptions := outbound.MieruOption{
Name: "mieru_outbound_udp",
Server: addrPort.Addr().String(),
Port: int(addrPort.Port()),
Transport: "UDP",
UserName: "test",
Password: "password",
HandshakeMode: handshakeMode,
}
out, err := outbound.NewMieru(outboundOptions)
if !assert.NoError(t, err) {
return
}
defer out.Close()
tunnel.DoSequentialTest(t, out)
}