diff --git a/client/control.go b/client/control.go index c8d186ca..1c11bffa 100644 --- a/client/control.go +++ b/client/control.go @@ -29,6 +29,7 @@ import ( "github.com/fatedier/frp/pkg/msg" "github.com/fatedier/frp/pkg/transport" utilnet "github.com/fatedier/frp/pkg/util/net" + "github.com/fatedier/frp/pkg/util/util" "github.com/fatedier/frp/pkg/util/wait" "github.com/fatedier/frp/pkg/util/xlog" ) @@ -104,6 +105,9 @@ func NewControl( ctl.msgDispatcher = msg.NewDispatcher(cryptoRW) ctl.registerMsgHandlers() + + ctl.xl.Info("get pxy cfgs: %v", util.JSONDump(ctl.pxyCfgs)) + ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher.SendChannel()) ctl.pm = proxy.NewManager(ctl.ctx, clientCfg, ctl.msgTransporter) @@ -133,10 +137,12 @@ func (ctl *Control) handleReqWorkConn(_ msg.Message) { m := &msg.NewWorkConn{ RunID: ctl.runID, } + if err = ctl.authSetter.SetNewWorkConn(m); err != nil { xl.Warn("error during NewWorkConn authentication: %v", err) return } + if err = msg.WriteMsg(workConn, m); err != nil { xl.Warn("work connection write to server error: %v", err) workConn.Close() @@ -149,6 +155,7 @@ func (ctl *Control) handleReqWorkConn(_ msg.Message) { workConn.Close() return } + if startMsg.Error != "" { xl.Error("StartWorkConn contains error: %s", startMsg.Error) workConn.Close() @@ -161,7 +168,11 @@ func (ctl *Control) handleReqWorkConn(_ msg.Message) { func (ctl *Control) handleNewProxyResp(m msg.Message) { xl := ctl.xl + inMsg := m.(*msg.NewProxyResp) + + xl.Info("proxy: %v, remote addr: %v, err: %v", inMsg.ProxyName, inMsg.RemoteAddr, inMsg.Error) + // Server will return NewProxyResp message to each NewProxy message. // Start a new proxy handler if no error got err := ctl.pm.StartProxy(inMsg.ProxyName, inMsg.RemoteAddr, inMsg.Error) diff --git a/client/proxy/proxy_wrapper.go b/client/proxy/proxy_wrapper.go index 346c6d07..d539a835 100644 --- a/client/proxy/proxy_wrapper.go +++ b/client/proxy/proxy_wrapper.go @@ -199,6 +199,7 @@ func (pw *Wrapper) checkWorker() { var newProxyMsg msg.NewProxy pw.Cfg.MarshalToMsg(&newProxyMsg) pw.lastSendStartMsg = now + _ = pw.handler(&event.StartProxyPayload{ NewProxyMsg: &newProxyMsg, }) diff --git a/go.mod b/go.mod index 8d27e522..92c638b5 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/samber/lo v1.38.1 github.com/spf13/cobra v1.7.0 github.com/stretchr/testify v1.8.4 + golang.org/x/crypto v0.14.0 golang.org/x/net v0.17.0 golang.org/x/oauth2 v0.10.0 golang.org/x/sync v0.3.0 @@ -64,7 +65,6 @@ require ( github.com/templexxx/cpufeat v0.0.0-20180724012125-cef66df7f161 // indirect github.com/templexxx/xor v0.0.0-20191217153810-f85b25db303b // indirect github.com/tjfoc/gmsm v1.4.1 // indirect - golang.org/x/crypto v0.14.0 // indirect golang.org/x/exp v0.0.0-20221205204356-47842c84f3db // indirect golang.org/x/mod v0.10.0 // indirect golang.org/x/sys v0.13.0 // indirect diff --git a/go.sum b/go.sum index af509c3f..8e2c09c5 100644 --- a/go.sum +++ b/go.sum @@ -216,6 +216,7 @@ golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9sn golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY= +golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= diff --git a/pkg/config/legacy/server.go b/pkg/config/legacy/server.go index 797770a3..d3cfcb2f 100644 --- a/pkg/config/legacy/server.go +++ b/pkg/config/legacy/server.go @@ -42,6 +42,7 @@ type ServerCommonConf struct { // BindPort specifies the port that the server listens on. By default, this // value is 7000. BindPort int `ini:"bind_port" json:"bind_port"` + // KCPBindPort specifies the KCP port that the server listens on. If this // value is 0, the server will not listen for KCP connections. By default, // this value is 0. diff --git a/pkg/config/v1/server.go b/pkg/config/v1/server.go index c42c3eca..a07c58e8 100644 --- a/pkg/config/v1/server.go +++ b/pkg/config/v1/server.go @@ -21,6 +21,12 @@ import ( "github.com/fatedier/frp/pkg/util/util" ) +type SSHGatewayConfig struct { + SSHBindPort int `json:"sshBindPort,omitempty" validate:"gte=0,lte=65535"` + SSHPrivateKeyFilePath string `json:"sshPrivateKeyFilePath,omitempty"` + SSHPublicKeyFilesPath string `json:"sshPublicKeyFilesPath,omitempty"` +} + type ServerConfig struct { APIMetadata @@ -31,6 +37,9 @@ type ServerConfig struct { // BindPort specifies the port that the server listens on. By default, this // value is 7000. BindPort int `json:"bindPort,omitempty"` + + SSHGatewayConfig SSHGatewayConfig `json:"sshGatewayConfig"` + // KCPBindPort specifies the KCP port that the server listens on. If this // value is 0, the server will not listen for KCP connections. KCPBindPort int `json:"kcpBindPort,omitempty"` diff --git a/pkg/config/v1/ssh.go b/pkg/config/v1/ssh.go new file mode 100644 index 00000000..d0f96c5c --- /dev/null +++ b/pkg/config/v1/ssh.go @@ -0,0 +1,6 @@ +package v1 + +const ( + // custom define + SSHClientLoginUserPrefix = "_frpc_ssh_client_" +) diff --git a/pkg/util/util/util.go b/pkg/util/util/util.go index d2562b01..d80d226a 100644 --- a/pkg/util/util/util.go +++ b/pkg/util/util/util.go @@ -19,6 +19,7 @@ import ( "crypto/rand" "crypto/subtle" "encoding/hex" + "encoding/json" "fmt" mathrand "math/rand" "net" @@ -144,3 +145,8 @@ func RandomSleep(duration time.Duration, minRatio, maxRatio float64) time.Durati func ConstantTimeEqString(a, b string) bool { return subtle.ConstantTimeCompare([]byte(a), []byte(b)) == 1 } + +func JSONDump(v interface{}) string { + prettyJSON, _ := json.MarshalIndent(v, "", "\t") + return string(prettyJSON) +} diff --git a/server/proxy/proxy.go b/server/proxy/proxy.go index fe6f781b..5ea99f1e 100644 --- a/server/proxy/proxy.go +++ b/server/proxy/proxy.go @@ -21,6 +21,7 @@ import ( "net" "reflect" "strconv" + "strings" "sync" "time" @@ -229,8 +230,14 @@ func (pxy *BaseProxy) handleUserTCPConnection(userConn net.Conn) { return } + var workConn net.Conn + // try all connections from the pool - workConn, err := pxy.GetWorkConnFromPool(userConn.RemoteAddr(), userConn.LocalAddr()) + if strings.HasPrefix(pxy.GetLoginMsg().User, v1.SSHClientLoginUserPrefix) { + workConn, err = pxy.getWorkConnFn() + } else { + workConn, err = pxy.GetWorkConnFromPool(userConn.RemoteAddr(), userConn.LocalAddr()) + } if err != nil { return } diff --git a/server/proxy/tcp.go b/server/proxy/tcp.go index 4196ad4a..69cdc70b 100644 --- a/server/proxy/tcp.go +++ b/server/proxy/tcp.go @@ -73,6 +73,7 @@ func (pxy *TCPProxy) Run() (remoteAddr string, err error) { pxy.rc.TCPPortManager.Release(pxy.realBindPort) } }() + listener, errRet := net.Listen("tcp", net.JoinHostPort(pxy.serverCfg.ProxyBindAddr, strconv.Itoa(pxy.realBindPort))) if errRet != nil { err = errRet diff --git a/server/service.go b/server/service.go index 2629b345..9c20d68b 100644 --- a/server/service.go +++ b/server/service.go @@ -22,6 +22,7 @@ import ( "io" "net" "net/http" + "os" "strconv" "time" @@ -29,6 +30,7 @@ import ( fmux "github.com/hashicorp/yamux" quic "github.com/quic-go/quic-go" "github.com/samber/lo" + "golang.org/x/crypto/ssh" "github.com/fatedier/frp/assets" "github.com/fatedier/frp/pkg/auth" @@ -66,6 +68,10 @@ type Service struct { // Accept connections from client listener net.Listener + // Accept connections using ssh + sshListener net.Listener + sshConfig *ssh.ServerConfig + // Accept connections using kcp kcpListener net.Listener @@ -199,6 +205,56 @@ func NewService(cfg *v1.ServerConfig) (svr *Service, err error) { svr.listener = ln log.Info("frps tcp listen on %s", address) + if cfg.SSHGatewayConfig.SSHBindPort > 0 { + svr.sshConfig = &ssh.ServerConfig{ + PublicKeyCallback: func(conn ssh.ConnMetadata, key ssh.PublicKey) (*ssh.Permissions, error) { + file := fmt.Sprintf("%v/%v.pub", cfg.SSHGatewayConfig.SSHPublicKeyFilesPath, ssh.FingerprintSHA256(key)) + authorizedKey, err := os.ReadFile(file) + if err != nil { + return nil, err + } + + parsedAuthorizedKey, _, _, _, err := ssh.ParseAuthorizedKey(authorizedKey) + if err != nil { + return nil, err + } + + if key.Type() == parsedAuthorizedKey.Type() && bytes.Equal(key.Marshal(), parsedAuthorizedKey.Marshal()) { + log.Info("ssh file: %v fingerprint sha256: %v", file, ssh.FingerprintSHA256(key)) + + return &ssh.Permissions{ + Extensions: map[string]string{ + ssh.FingerprintSHA256(key): string(authorizedKey), + }, + }, nil + } + return nil, fmt.Errorf("unknown public key for %q", conn.User()) + }, + } + + privateBytes, err := os.ReadFile(cfg.SSHGatewayConfig.SSHPrivateKeyFilePath) + if err != nil { + log.Error("Failed to load private key") + return nil, err + } + + private, err := ssh.ParsePrivateKey(privateBytes) + if err != nil { + log.Error("Failed to parse private key, error: %v", err) + return nil, err + } + + svr.sshConfig.AddHostKey(private) + + sshAddr := net.JoinHostPort(cfg.BindAddr, strconv.Itoa(cfg.SSHGatewayConfig.SSHBindPort)) + svr.sshListener, err = net.Listen("tcp", sshAddr) + if err != nil { + log.Error("Failed to listen on %v, error: %v", sshAddr, err) + return nil, err + } + log.Info("ssh server listening on %v", sshAddr) + } + // Listen for accepting connections from client using kcp protocol. if cfg.KCPBindPort > 0 { address := net.JoinHostPort(cfg.BindAddr, strconv.Itoa(cfg.KCPBindPort)) @@ -326,6 +382,10 @@ func (svr *Service) Run(ctx context.Context) { svr.ctx = ctx svr.cancel = cancel + if svr.sshListener != nil { + go svr.HandleSSHListener(svr.sshListener) + } + if svr.kcpListener != nil { go svr.HandleListener(svr.kcpListener) } @@ -348,6 +408,10 @@ func (svr *Service) Run(ctx context.Context) { } func (svr *Service) Close() error { + if svr.sshListener != nil { + svr.sshListener.Close() + svr.sshListener = nil + } if svr.kcpListener != nil { svr.kcpListener.Close() svr.kcpListener = nil @@ -493,6 +557,60 @@ func (svr *Service) HandleListener(l net.Listener) { } } +func (svr *Service) HandleSSHListener(listener net.Listener) { + for { + tcpConn, err := listener.Accept() + if err != nil { + log.Error("failed to accept incoming ssh connection (%s)", err) + return + } + log.Info("new tcp conn connected: %v", tcpConn.RemoteAddr().String()) + + pxyPayloadCh := make(chan v1.ProxyConfigurer) + replyCh := make(chan interface{}) + + ss, err := NewSSHService(tcpConn, svr.sshConfig, pxyPayloadCh, replyCh) + if err != nil { + log.Error("new ssh service error: %v", err) + continue + } + ss.Run() + + go func() { + for { + pxyCfg := <-pxyPayloadCh + + ctx := context.Background() + + vs, err := NewVirtualService( + ctx, + v1.ClientCommonConfig{}, + *svr.cfg, + msg.Login{ + User: v1.SSHClientLoginUserPrefix + tcpConn.RemoteAddr().String(), + }, + svr.rc, + pxyCfg, + ss, + replyCh, + ) + if err != nil { + log.Error("new virtual service error: %v", err) + ss.Close() + return + } + + err = vs.Run(ctx) + if err != nil { + log.Error("proxy run error: %v", err) + vs.Close() + return + } + } + }() + } +} + func (svr *Service) HandleQUICListener(l *quic.Listener) { // Listen for incoming connections from client. for { diff --git a/server/ssh_service.go b/server/ssh_service.go new file mode 100644 index 00000000..c9c78c19 --- /dev/null +++ b/server/ssh_service.go @@ -0,0 +1,484 @@ +package server + +import ( + "encoding/binary" + "errors" + "flag" + "fmt" + "io" + "net" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + gerror "github.com/fatedier/golib/errors" + "golang.org/x/crypto/ssh" + + v1 "github.com/fatedier/frp/pkg/config/v1" + "github.com/fatedier/frp/pkg/util/log" + "github.com/fatedier/frp/pkg/util/util" +) + +const ( + // ssh protocol define + // https://datatracker.ietf.org/doc/html/rfc4254#page-16 + ChannelTypeServerOpenChannel = "forwarded-tcpip" + RequestTypeForward = "tcpip-forward" + + // golang ssh package define. + // https://pkg.go.dev/golang.org/x/crypto/ssh + RequestTypeHeartbeat = "keepalive@openssh.com" +) + +// 当 proxy 失败会返回该错误 +type VProxyError struct{} + +// ssh protocol define +// https://datatracker.ietf.org/doc/html/rfc4254#page-16 +// parse ssh client cmds input +type forwardedTCPPayload struct { + Addr string + Port uint32 + + // can be default empty value but do not delete it + // because ssh protocol shoule be reserved + OriginAddr string + OriginPort uint32 +} + +// custom define +// parse ssh client cmds input +type SSHCmdPayload struct { + Address string + Port uint32 +} + +// custom define +// with frp control cmds +type SSHExtraPayload struct { + Type string + + // TODO port can be set by extra message and priority to ssh raw cmd + Address string + Port uint32 +} + +type SSHService struct { + tcpConn net.Conn + cfg *ssh.ServerConfig + + sshConn *ssh.ServerConn + gChannel <-chan ssh.NewChannel + gReq <-chan *ssh.Request + + addrPayloadCh chan SSHCmdPayload + extraPayloadCh chan SSHExtraPayload + + proxyPayloadCh chan v1.ProxyConfigurer + replyCh chan interface{} + + closeCh chan struct{} + exit int32 +} + +func NewSSHService( + tcpConn net.Conn, + cfg *ssh.ServerConfig, + proxyPayloadCh chan v1.ProxyConfigurer, + replyCh chan interface{}, +) (ss *SSHService, err error) { + ss = &SSHService{ + tcpConn: tcpConn, + cfg: cfg, + + addrPayloadCh: make(chan SSHCmdPayload), + extraPayloadCh: make(chan SSHExtraPayload), + + proxyPayloadCh: proxyPayloadCh, + replyCh: replyCh, + + closeCh: make(chan struct{}), + exit: 0, + } + + ss.sshConn, ss.gChannel, ss.gReq, err = ssh.NewServerConn(tcpConn, cfg) + if err != nil { + log.Error("ssh handshake error: %v", err) + return nil, err + } + + log.Info("ssh connection success") + + return ss, nil +} + +func (ss *SSHService) Run() { + go ss.loopGenerateProxy() + go ss.loopParseCmdPayload() + go ss.loopParseExtraPayload() + go ss.loopReply() +} + +func (ss *SSHService) Exit() <-chan struct{} { + return ss.closeCh +} + +func (ss *SSHService) Close() { + if atomic.LoadInt32(&ss.exit) == 1 { + return + } + + select { + case <-ss.closeCh: + return + default: + } + + close(ss.closeCh) + close(ss.addrPayloadCh) + close(ss.extraPayloadCh) + + _ = ss.sshConn.Wait() + + ss.sshConn.Close() + ss.tcpConn.Close() + + atomic.StoreInt32(&ss.exit, 1) + + log.Info("ssh service close") +} + +func (ss *SSHService) loopParseCmdPayload() { + for { + select { + case req, ok := <-ss.gReq: + if !ok { + log.Info("global request is close") + ss.Close() + return + } + + switch req.Type { + case RequestTypeForward: + var addrPayload SSHCmdPayload + if err := ssh.Unmarshal(req.Payload, &addrPayload); err != nil { + log.Error("ssh unmarshal error: %v", err) + return + } + _ = gerror.PanicToError(func() { + ss.addrPayloadCh <- addrPayload + }) + default: + if req.Type == RequestTypeHeartbeat { + log.Debug("ssh heartbeat data") + } else { + log.Info("default req, data: %v", util.JSONDump(req)) + } + } + if req.WantReply { + err := req.Reply(true, nil) + if err != nil { + log.Error("reply to ssh client error: %v", err) + } + } + case <-ss.closeCh: + log.Info("loop parse cmd payload close") + return + } + } +} + +func (ss *SSHService) loopSendHeartbeat(ch ssh.Channel) { + tk := time.NewTicker(time.Second * 60) + defer tk.Stop() + + for { + select { + case <-tk.C: + ok, err := ch.SendRequest("heartbeat", false, nil) + if err != nil { + log.Error("channel send req error: %v", err) + if err == io.EOF { + ss.Close() + return + } + continue + } + log.Debug("heartbeat send success, ok: %v", ok) + case <-ss.closeCh: + return + } + } +} + +func (ss *SSHService) loopParseExtraPayload() { + log.Info("loop parse extra payload start") + + for newChannel := range ss.gChannel { + ch, req, err := newChannel.Accept() + if err != nil { + log.Error("channel accept error: %v", err) + return + } + + go ss.loopSendHeartbeat(ch) + + go func(req <-chan *ssh.Request) { + for r := range req { + if len(r.Payload) <= 4 { + log.Info("r.payload is less than 4") + continue + } + + dataLen := binary.BigEndian.Uint32(r.Payload[:4]) + p := string(r.Payload[4 : 4+dataLen]) + + if !strings.Contains(p, "frpc") { + log.Info("payload not contains frp keyword: %v", p) + continue + } + + msg, err := parseSSHExtraMessage(p) + if err != nil { + log.Error("parse ssh extra message error: %v, payload: %v", err, r.Payload) + continue + } + _ = gerror.PanicToError(func() { + ss.extraPayloadCh <- msg + }) + return + } + }(req) + } +} + +func (ss *SSHService) SSHConn() *ssh.ServerConn { + return ss.sshConn +} + +func (ss *SSHService) TCPConn() net.Conn { + return ss.tcpConn +} + +func (ss *SSHService) loopReply() { + for { + select { + case <-ss.closeCh: + log.Info("loop reply close") + return + case req := <-ss.replyCh: + switch req.(type) { + case *VProxyError: + log.Error("run frp proxy error, close ssh service") + ss.Close() + default: + // TODO + } + } + } +} + +func (ss *SSHService) loopGenerateProxy() { + log.Info("loop generate proxy start") + + for { + if atomic.LoadInt32(&ss.exit) == 1 { + return + } + + wg := new(sync.WaitGroup) + wg.Add(2) + + var p1 SSHCmdPayload + var p2 SSHExtraPayload + + go func() { + defer wg.Done() + for { + select { + case <-ss.closeCh: + return + case p1 = <-ss.addrPayloadCh: + return + } + } + }() + + go func() { + defer wg.Done() + for { + select { + case <-ss.closeCh: + return + case p2 = <-ss.extraPayloadCh: + return + } + } + }() + + wg.Wait() + + if atomic.LoadInt32(&ss.exit) == 1 { + return + } + + switch p2.Type { + case "http": + case "tcp": + ss.proxyPayloadCh <- &v1.TCPProxyConfig{ + ProxyBaseConfig: v1.ProxyBaseConfig{ + Name: fmt.Sprintf("ssh-proxy-%v-%v", ss.tcpConn.RemoteAddr().String(), time.Now().UnixNano()), + Type: p2.Type, + }, + RemotePort: int(p1.Port), + } + default: + log.Warn("invalid frp proxy type: %v", p2.Type) + } + + } +} + +func parseSSHExtraMessage(s string) (p SSHExtraPayload, err error) { + ss := strings.Fields(s) + if len(ss) <= 1 { + return p, fmt.Errorf("invalid ssh input, args: %v", ss) + } + + for i, v := range ss { + ss[i] = strings.TrimSpace(v) + } + + if ss[0] != "frpc" { + return p, fmt.Errorf("first input should be frpc, but got: %v", ss[0]) + } + + if ss[1] != "tcp" && ss[1] != "http" { + return p, fmt.Errorf("only support tcp/http now") + } + + switch ss[1] { + case "tcp": + tcpCmd, err := ParseTCPCommand(ss) + if err != nil { + return SSHExtraPayload{}, fmt.Errorf("invalid ssh input: %v", err) + } + + port, _ := strconv.Atoi(tcpCmd.Port) + + p = SSHExtraPayload{ + Type: "tcp", + Address: tcpCmd.Address, + Port: uint32(port), + } + case "http": + httpCmd, err := ParseHTTPCommand(ss) + if err != nil { + return SSHExtraPayload{}, fmt.Errorf("invalid ssh input: %v", err) + } + + _ = httpCmd + + p = SSHExtraPayload{ + Type: "http", + } + } + + return p, nil +} + +type HTTPCommand struct { + Domain string + BasicAuthUser string + BasicAuthPass string +} + +func ParseHTTPCommand(params []string) (*HTTPCommand, error) { + if len(params) < 2 { + return nil, errors.New("invalid HTTP command") + } + + var ( + basicAuth string + domainURL string + basicAuthUser string + basicAuthPass string + ) + + fs := flag.NewFlagSet("frpc http", flag.ContinueOnError) + fs.StringVar(&basicAuth, "basic-auth", "", "") + fs.StringVar(&domainURL, "domain", "", "") + + fs.SetOutput(&nullWriter{}) // Disables usage output + + err := fs.Parse(params[2:]) + if err != nil { + if !errors.Is(err, flag.ErrHelp) { + return nil, err + } + } + + if basicAuth != "" { + authParts := strings.SplitN(basicAuth, ":", 2) + basicAuthUser = authParts[0] + if len(authParts) > 1 { + basicAuthPass = authParts[1] + } + } + + httpCmd := &HTTPCommand{ + Domain: domainURL, + BasicAuthUser: basicAuthUser, + BasicAuthPass: basicAuthPass, + } + return httpCmd, nil +} + +type TCPCommand struct { + Address string + Port string +} + +func ParseTCPCommand(params []string) (*TCPCommand, error) { + if len(params) < 2 || params[0] != "frpc" || params[1] != "tcp" { + return nil, errors.New("invalid TCP command") + } + + var ( + address string + port string + ) + + fs := flag.NewFlagSet("frpc tcp", flag.ContinueOnError) + fs.StringVar(&address, "address", "", "The IP address to listen on") + fs.StringVar(&port, "port", "", "The port to listen on") + fs.SetOutput(&nullWriter{}) // Disables usage output + + args := params[2:] + err := fs.Parse(args) + if err != nil { + if !errors.Is(err, flag.ErrHelp) { + return nil, err + } + } + + parsedAddr, err := net.ResolveIPAddr("ip", address) + if err != nil { + return nil, err + } + if _, err := net.LookupPort("tcp", port); err != nil { + return nil, err + } + + tcpCmd := &TCPCommand{ + Address: parsedAddr.String(), + Port: port, + } + return tcpCmd, nil +} + +type nullWriter struct{} + +func (w *nullWriter) Write(p []byte) (n int, err error) { return len(p), nil } diff --git a/server/vclient_service.go b/server/vclient_service.go new file mode 100644 index 00000000..1e0167e6 --- /dev/null +++ b/server/vclient_service.go @@ -0,0 +1,187 @@ +package server + +import ( + "context" + "fmt" + "net" + "sync/atomic" + "time" + + "golang.org/x/crypto/ssh" + + "github.com/fatedier/frp/pkg/config" + v1 "github.com/fatedier/frp/pkg/config/v1" + "github.com/fatedier/frp/pkg/msg" + plugin "github.com/fatedier/frp/pkg/plugin/server" + "github.com/fatedier/frp/pkg/util/log" + frp_net "github.com/fatedier/frp/pkg/util/net" + "github.com/fatedier/frp/pkg/util/util" + "github.com/fatedier/frp/pkg/util/xlog" + "github.com/fatedier/frp/server/controller" + "github.com/fatedier/frp/server/proxy" +) + +// VirtualService is a client VirtualService run in frps +type VirtualService struct { + clientCfg v1.ClientCommonConfig + pxyCfg v1.ProxyConfigurer + serverCfg v1.ServerConfig + + sshSvc *SSHService + + // uniq id got from frps, attach it in loginMsg + runID string + loginMsg *msg.Login + + // All resource managers and controllers + rc *controller.ResourceController + + exit uint32 // 0 means not exit + // SSHService context + ctx context.Context + // call cancel to stop SSHService + cancel context.CancelFunc + + replyCh chan interface{} + pxy proxy.Proxy +} + +func NewVirtualService( + ctx context.Context, + clientCfg v1.ClientCommonConfig, + serverCfg v1.ServerConfig, + logMsg msg.Login, + rc *controller.ResourceController, + pxyCfg v1.ProxyConfigurer, + sshSvc *SSHService, + replyCh chan interface{}, +) (svr *VirtualService, err error) { + svr = &VirtualService{ + clientCfg: clientCfg, + serverCfg: serverCfg, + rc: rc, + + loginMsg: &logMsg, + + sshSvc: sshSvc, + pxyCfg: pxyCfg, + + ctx: ctx, + exit: 0, + + replyCh: replyCh, + } + + svr.runID, err = util.RandID() + if err != nil { + return nil, err + } + + go svr.loopCheck() + + return +} + +func (svr *VirtualService) Run(ctx context.Context) (err error) { + ctx, cancel := context.WithCancel(ctx) + svr.ctx = xlog.NewContext(ctx, xlog.New()) + svr.cancel = cancel + + log.Info("get svr pxy: %v", util.JSONDump(svr.pxyCfg)) + + remoteAddr, err := svr.RegisterProxy(&msg.NewProxy{ + ProxyName: svr.pxyCfg.(*v1.TCPProxyConfig).Name, + ProxyType: svr.pxyCfg.(*v1.TCPProxyConfig).Type, + RemotePort: svr.pxyCfg.(*v1.TCPProxyConfig).RemotePort, + }) + if err != nil { + return err + } + + log.Info("run a reverse proxy on port: %v", remoteAddr) + + return nil +} + +func (svr *VirtualService) Close() { + svr.GracefulClose(time.Duration(0)) +} + +func (svr *VirtualService) GracefulClose(d time.Duration) { + atomic.StoreUint32(&svr.exit, 1) + svr.pxy.Close() + + if svr.cancel != nil { + svr.cancel() + } + + svr.replyCh <- &VProxyError{} +} + +func (svr *VirtualService) loopCheck() { + <-svr.sshSvc.Exit() + svr.pxy.Close() + log.Info("virtual client service close") +} + +func (svr *VirtualService) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err error) { + var pxyConf v1.ProxyConfigurer + pxyConf, err = config.NewProxyConfigurerFromMsg(pxyMsg, &svr.serverCfg) + if err != nil { + return + } + + // User info + userInfo := plugin.UserInfo{ + User: svr.loginMsg.User, + Metas: svr.loginMsg.Metas, + RunID: svr.runID, + } + + svr.pxy, err = proxy.NewProxy(svr.ctx, &proxy.Options{ + LoginMsg: svr.loginMsg, + UserInfo: userInfo, + Configurer: pxyConf, + ResourceController: svr.rc, + + GetWorkConnFn: svr.GetWorkConn, + PoolCount: 10, + + ServerCfg: &svr.serverCfg, + }) + if err != nil { + return remoteAddr, err + } + + remoteAddr, err = svr.pxy.Run() + if err != nil { + log.Warn("proxy run error: %v", err) + return + } + + defer func() { + if err != nil { + log.Warn("proxy close") + svr.pxy.Close() + } + }() + + return +} + +func (svr *VirtualService) GetWorkConn() (workConn net.Conn, err error) { + // tell ssh client open a new stream for work + payload := forwardedTCPPayload{ + Addr: svr.serverCfg.BindAddr, + Port: uint32(svr.pxyCfg.(*v1.TCPProxyConfig).RemotePort), + } + + channel, reqs, err := svr.sshSvc.SSHConn().OpenChannel(ChannelTypeServerOpenChannel, ssh.Marshal(payload)) + if err != nil { + return nil, fmt.Errorf("open ssh channel error: %v", err) + } + go ssh.DiscardRequests(reqs) + + workConn = frp_net.WrapReadWriteCloserToConn(channel, svr.sshSvc.tcpConn) + return workConn, nil +}