@@ -5,16 +5,18 @@ import (
55 "io"
66 "time"
77
8- "github.com/xtls/xray-core/app/dispatcher"
98 "github.com/xtls/xray-core/common"
109 "github.com/xtls/xray-core/common/buf"
1110 "github.com/xtls/xray-core/common/errors"
1211 "github.com/xtls/xray-core/common/log"
1312 "github.com/xtls/xray-core/common/net"
1413 "github.com/xtls/xray-core/common/protocol"
1514 "github.com/xtls/xray-core/common/session"
15+ //"github.com/xtls/xray-core/common/signal"
1616 "github.com/xtls/xray-core/common/signal/done"
17+ "github.com/xtls/xray-core/common/task"
1718 "github.com/xtls/xray-core/core"
19+ //"github.com/xtls/xray-core/features/policy"
1820 "github.com/xtls/xray-core/features/routing"
1921 "github.com/xtls/xray-core/transport"
2022 "github.com/xtls/xray-core/transport/pipe"
@@ -64,14 +66,43 @@ func (s *Server) DispatchLink(ctx context.Context, dest net.Destination, link *t
6466 if dest .Address != muxCoolAddress {
6567 return s .dispatcher .DispatchLink (ctx , dest , link )
6668 }
67- link = s .dispatcher .(* dispatcher.DefaultDispatcher ).WrapLink (ctx , link )
68- worker , err := NewServerWorker (ctx , s .dispatcher , link )
69+
70+ // For Mux, we need to use pipe to guard against multiple sub-connections writing back responses at the same time
71+ // sessionPolicy = h.policyManager.ForLevel(request.User.Level)
72+ // ctx, cancel := context.WithCancel(ctx)
73+ // timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeouts.ConnectionIdle)
74+ // ctx = policy.ContextWithBufferPolicy(ctx, sessionPolicy.Buffer)
75+ opts := pipe .OptionsFromContext (ctx )
76+ uplinkReader , uplinkWriter := pipe .New (opts ... )
77+ downlinkReader , downlinkWriter := pipe .New (opts ... )
78+
79+ _ , err := NewServerWorker (ctx , s .dispatcher , & transport.Link {
80+ Reader : uplinkReader ,
81+ Writer : downlinkWriter ,
82+ })
6983 if err != nil {
7084 return err
7185 }
72- select {
73- case <- ctx .Done ():
74- case <- worker .done .Wait ():
86+ inboundLink := & transport.Link {Reader : downlinkReader , Writer : uplinkWriter }
87+ requestDone := func () error {
88+ //defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly)
89+ if err := buf .Copy (link .Reader , inboundLink .Writer ); err != nil {
90+ return errors .New ("failed to transfer request" ).Base (err )
91+ }
92+ return nil
93+ }
94+ responseDone := func () error {
95+ //defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly)
96+ if err := buf .Copy (inboundLink .Reader , link .Writer ); err != nil {
97+ return err
98+ }
99+ return nil
100+ }
101+ requestDonePost := task .OnSuccess (requestDone , task .Close (inboundLink .Writer ))
102+ if err := task .Run (ctx , requestDonePost , responseDone ); err != nil {
103+ common .Interrupt (inboundLink .Reader )
104+ common .Interrupt (inboundLink .Writer )
105+ return errors .New ("connection ends" ).Base (err )
75106 }
76107 return nil
77108}
0 commit comments