frp/server/control.go
百里(barry) c0b091f139
add proxy manager and monitor (#7)
* GitButler Integration Commit

This is an integration commit for the virtual branches that GitButler is tracking.

Due to GitButler managing multiple virtual branches, you cannot switch back and
forth between git branches and virtual branches easily. 

If you switch to another branch, GitButler will need to be reinitialized.
If you commit on this branch, GitButler will throw it away.

Here are the branches that are currently applied:
 - feat/frpcc (refs/gitbutler/feat/frpcc)
   branch head: e52195a01a6e3432cccc3ba952a8c940ad4d3fc6
   - test/main.go
   - go.sum
   - go.mod
   - server/service.go
For more information about what we're doing here, check out our docs:
https://docs.gitbutler.com/features/virtual-branches/integration-branch

* fix: barry 2024-07-03 15:36:17

* fix: barry 2024-07-03 15:44:12

* fix: barry 2024-07-03 15:46:25

* fix: barry 2024-07-03 16:52:13

* fix: barry 2024-07-03 17:30:53

* fix: barry 2024-07-03 17:42:01

* fix: barry 2024-07-03 18:43:39

* fix: barry 2024-07-03 19:36:06

* fix: barry 2024-07-03 19:43:47

* fix: barry 2024-07-04 10:51:06

* fix: barry 2024-07-04 12:29:01

* fix log and body

* fix rate calc

* fix: barry 2024-07-09 18:20:06

---------

Co-authored-by: GitButler <gitbutler@gitbutler.com>
2024-07-09 19:20:28 +08:00

585 lines
15 KiB
Go

// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"context"
"fmt"
"net"
"runtime/debug"
"sync"
"sync/atomic"
"time"
"github.com/samber/lo"
"github.com/fatedier/frp/pkg/auth"
"github.com/fatedier/frp/pkg/config"
v1 "github.com/fatedier/frp/pkg/config/v1"
pkgerr "github.com/fatedier/frp/pkg/errors"
"github.com/fatedier/frp/pkg/msg"
plugin "github.com/fatedier/frp/pkg/plugin/server"
"github.com/fatedier/frp/pkg/transport"
netpkg "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/util"
"github.com/fatedier/frp/pkg/util/version"
"github.com/fatedier/frp/pkg/util/wait"
"github.com/fatedier/frp/pkg/util/xlog"
"github.com/fatedier/frp/server/controller"
"github.com/fatedier/frp/server/metrics"
"github.com/fatedier/frp/server/proxy"
)
type ControlManager struct {
// controls indexed by run id
ctlsByRunID map[string]*Control
mu sync.RWMutex
}
func NewControlManager() *ControlManager {
return &ControlManager{
ctlsByRunID: make(map[string]*Control),
}
}
func (cm *ControlManager) Add(runID string, ctl *Control) (old *Control) {
cm.mu.Lock()
defer cm.mu.Unlock()
var ok bool
old, ok = cm.ctlsByRunID[runID]
if ok {
old.Replaced(ctl)
}
cm.ctlsByRunID[runID] = ctl
return
}
// we should make sure if it's the same control to prevent delete a new one
func (cm *ControlManager) Del(runID string, ctl *Control) {
cm.mu.Lock()
defer cm.mu.Unlock()
if c, ok := cm.ctlsByRunID[runID]; ok && c == ctl {
delete(cm.ctlsByRunID, runID)
}
}
func (cm *ControlManager) GetByID(runID string) (ctl *Control, ok bool) {
cm.mu.RLock()
defer cm.mu.RUnlock()
ctl, ok = cm.ctlsByRunID[runID]
return
}
func (cm *ControlManager) Close() error {
cm.mu.Lock()
defer cm.mu.Unlock()
for _, ctl := range cm.ctlsByRunID {
ctl.Close()
}
cm.ctlsByRunID = make(map[string]*Control)
return nil
}
type Control struct {
// all resource managers and controllers
rc *controller.ResourceController
// proxy manager
pxyManager *proxy.Manager
// plugin manager
pluginManager *plugin.Manager
// verifies authentication based on selected method
authVerifier auth.Verifier
// other components can use this to communicate with client
msgTransporter transport.MessageTransporter
// msgDispatcher is a wrapper for control connection.
// It provides a channel for sending messages, and you can register handlers to process messages based on their respective types.
msgDispatcher *msg.Dispatcher
// login message
loginMsg *msg.Login
// control connection
conn net.Conn
// work connections
workConnCh chan net.Conn
// proxies in one client
proxies map[string]proxy.Proxy
// pool count
poolCount int
// ports used, for limitations
portsUsedNum int
// last time got the Ping message
lastPing atomic.Value
// A new run id will be generated when a new client login.
// If run id got from login message has same run id, it means it's the same client, so we can
// replace old controller instantly.
runID string
mu sync.RWMutex
// Server configuration information
serverCfg *v1.ServerConfig
xl *xlog.Logger
ctx context.Context
doneCh chan struct{}
}
// TODO(fatedier): Referencing the implementation of frpc, encapsulate the input parameters as SessionContext.
func NewControl(
ctx context.Context,
rc *controller.ResourceController,
pxyManager *proxy.Manager,
pluginManager *plugin.Manager,
authVerifier auth.Verifier,
ctlConn net.Conn,
ctlConnEncrypted bool,
loginMsg *msg.Login,
serverCfg *v1.ServerConfig,
) (*Control, error) {
poolCount := loginMsg.PoolCount
if poolCount > int(serverCfg.Transport.MaxPoolCount) {
poolCount = int(serverCfg.Transport.MaxPoolCount)
}
ctl := &Control{
rc: rc,
pxyManager: pxyManager,
pluginManager: pluginManager,
authVerifier: authVerifier,
conn: ctlConn,
loginMsg: loginMsg,
workConnCh: make(chan net.Conn, poolCount+10),
proxies: make(map[string]proxy.Proxy),
poolCount: poolCount,
portsUsedNum: 0,
runID: loginMsg.RunID,
serverCfg: serverCfg,
xl: xlog.FromContextSafe(ctx),
ctx: ctx,
doneCh: make(chan struct{}),
}
ctl.lastPing.Store(time.Now())
if ctlConnEncrypted {
key := []byte(ctl.serverCfg.Auth.Token)
if ctl.serverCfg.Auth.Method == v1.AuthMethodJWT {
key = []byte(loginMsg.PrivilegeKey)
}
cryptoRW, err := netpkg.NewCryptoReadWriter(ctl.conn, key)
if err != nil {
return nil, err
}
ctl.msgDispatcher = msg.NewDispatcher(cryptoRW)
} else {
ctl.msgDispatcher = msg.NewDispatcher(ctl.conn)
}
ctl.registerMsgHandlers()
ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher.SendChannel())
return ctl, nil
}
// Start send a login success message to client and start working.
func (ctl *Control) Start() {
loginRespMsg := &msg.LoginResp{
Version: version.Full(),
RunID: ctl.runID,
Error: "",
}
_ = msg.WriteMsg(ctl.conn, loginRespMsg)
go func() {
for i := 0; i < ctl.poolCount; i++ {
// ignore error here, that means that this control is closed
_ = ctl.msgDispatcher.Send(&msg.ReqWorkConn{})
}
}()
go ctl.worker()
}
func (ctl *Control) Close() error {
ctl.conn.Close()
return nil
}
func (ctl *Control) Replaced(newCtl *Control) {
xl := ctl.xl
xl.Infof("Replaced by client [%s]", newCtl.runID)
ctl.runID = ""
ctl.conn.Close()
}
func (ctl *Control) RegisterWorkConn(conn net.Conn) error {
xl := ctl.xl
defer func() {
if err := recover(); err != nil {
xl.Errorf("panic error: %v", err)
xl.Errorf(string(debug.Stack()))
}
}()
select {
case ctl.workConnCh <- conn:
xl.Debugf("new work connection registered")
return nil
default:
xl.Debugf("work connection pool is full, discarding")
return fmt.Errorf("work connection pool is full, discarding")
}
}
// When frps get one user connection, we get one work connection from the pool and return it.
// If no workConn available in the pool, send message to frpc to get one or more
// and wait until it is available.
// return an error if wait timeout
func (ctl *Control) GetWorkConn() (workConn net.Conn, err error) {
xl := ctl.xl
defer func() {
if err := recover(); err != nil {
xl.Errorf("panic error: %v", err)
xl.Errorf(string(debug.Stack()))
}
}()
var ok bool
// get a work connection from the pool
select {
case workConn, ok = <-ctl.workConnCh:
if !ok {
err = pkgerr.ErrCtlClosed
return
}
xl.Debugf("get work connection from pool")
default:
// no work connections available in the poll, send message to frpc to get more
if err := ctl.msgDispatcher.Send(&msg.ReqWorkConn{}); err != nil {
return nil, fmt.Errorf("control is already closed")
}
select {
case workConn, ok = <-ctl.workConnCh:
if !ok {
err = pkgerr.ErrCtlClosed
xl.Warnf("no work connections available, %v", err)
return
}
case <-time.After(time.Duration(ctl.serverCfg.UserConnTimeout) * time.Second):
err = fmt.Errorf("timeout trying to get work connection")
xl.Warnf("%v", err)
return
}
}
// When we get a work connection from pool, replace it with a new one.
_ = ctl.msgDispatcher.Send(&msg.ReqWorkConn{})
return
}
func (ctl *Control) heartbeatWorker() {
xl := ctl.xl
// Don't need application heartbeat if TCPMux is enabled,
// yamux will do same thing.
// TODO(fatedier): let default HeartbeatTimeout to -1 if TCPMux is enabled. Users can still set it to positive value to enable it.
if !lo.FromPtr(ctl.serverCfg.Transport.TCPMux) && ctl.serverCfg.Transport.HeartbeatTimeout > 0 {
go wait.Until(func() {
if time.Since(ctl.lastPing.Load().(time.Time)) > time.Duration(ctl.serverCfg.Transport.HeartbeatTimeout)*time.Second {
xl.Warnf("heartbeat timeout")
ctl.conn.Close()
return
}
}, time.Second, ctl.doneCh)
}
}
// block until Control closed
func (ctl *Control) WaitClosed() {
<-ctl.doneCh
}
func (ctl *Control) worker() {
xl := ctl.xl
go ctl.heartbeatWorker()
go ctl.msgDispatcher.Run()
<-ctl.msgDispatcher.Done()
ctl.conn.Close()
ctl.mu.Lock()
defer ctl.mu.Unlock()
close(ctl.workConnCh)
for workConn := range ctl.workConnCh {
workConn.Close()
}
for _, pxy := range ctl.proxies {
pxy.Close()
ctl.pxyManager.Del(pxy.GetName())
metrics.Server.CloseProxy(pxy.GetName(), pxy.GetConfigurer().GetBaseConfig().Type)
notifyContent := &plugin.CloseProxyContent{
User: plugin.UserInfo{
User: ctl.loginMsg.User,
Metas: ctl.loginMsg.Metas,
RunID: ctl.loginMsg.RunID,
},
CloseProxy: msg.CloseProxy{
ProxyName: pxy.GetName(),
},
}
go func() {
_ = ctl.pluginManager.CloseProxy(notifyContent)
}()
}
metrics.Server.CloseClient()
xl.Infof("client exit success")
close(ctl.doneCh)
}
func (ctl *Control) registerMsgHandlers() {
ctl.msgDispatcher.RegisterHandler(&msg.NewProxy{}, ctl.handleNewProxy)
ctl.msgDispatcher.RegisterHandler(&msg.Ping{}, ctl.handlePing)
ctl.msgDispatcher.RegisterHandler(&msg.ClientProxyClose{}, ctl.handleCustom)
ctl.msgDispatcher.RegisterHandler(&msg.NatHoleVisitor{}, msg.AsyncHandler(ctl.handleNatHoleVisitor))
ctl.msgDispatcher.RegisterHandler(&msg.NatHoleClient{}, msg.AsyncHandler(ctl.handleNatHoleClient))
ctl.msgDispatcher.RegisterHandler(&msg.NatHoleReport{}, msg.AsyncHandler(ctl.handleNatHoleReport))
ctl.msgDispatcher.RegisterHandler(&msg.CloseProxy{}, ctl.handleCloseProxy)
}
func (ctl *Control) handleNewProxy(m msg.Message) {
xl := ctl.xl
inMsg := m.(*msg.NewProxy)
content := &plugin.NewProxyContent{
User: plugin.UserInfo{
User: ctl.loginMsg.User,
Metas: ctl.loginMsg.Metas,
RunID: ctl.loginMsg.RunID,
},
NewProxy: *inMsg,
}
var remoteAddr string
retContent, err := ctl.pluginManager.NewProxy(content)
if err == nil {
inMsg = &retContent.NewProxy
remoteAddr, err = ctl.RegisterProxy(inMsg)
}
// register proxy in this control
resp := &msg.NewProxyResp{
ProxyName: inMsg.ProxyName,
}
if err != nil {
xl.Warnf("new proxy [%s] type [%s] error: %v", inMsg.ProxyName, inMsg.ProxyType, err)
resp.Error = util.GenerateResponseErrorString(fmt.Sprintf("new proxy [%s] error", inMsg.ProxyName),
err, lo.FromPtr(ctl.serverCfg.DetailedErrorsToClient))
} else {
resp.RemoteAddr = remoteAddr
xl.Infof("new proxy [%s] type [%s] success", inMsg.ProxyName, inMsg.ProxyType)
metrics.Server.NewProxy(inMsg.ProxyName, inMsg.ProxyType)
}
_ = ctl.msgDispatcher.Send(resp)
}
func (ctl *Control) handleCustom(m msg.Message) {
inMsg := m.(*msg.ClientProxyClose)
xl := ctl.xl
xl.Debugf("handle custom request")
_ = ctl.msgDispatcher.Send(inMsg)
}
func (ctl *Control) handlePing(m msg.Message) {
xl := ctl.xl
inMsg := m.(*msg.Ping)
content := &plugin.PingContent{
User: plugin.UserInfo{
User: ctl.loginMsg.User,
Metas: ctl.loginMsg.Metas,
RunID: ctl.loginMsg.RunID,
},
Ping: *inMsg,
}
retContent, err := ctl.pluginManager.Ping(content)
var authErr string
if err == nil {
inMsg = &retContent.Ping
err = ctl.authVerifier.VerifyPing(inMsg)
if err != nil {
authErr = err.Error()
}
}
if err != nil {
xl.Warnf("received invalid ping: %v", err)
_ = ctl.msgDispatcher.Send(&msg.Pong{
Error: util.GenerateResponseErrorString("invalid ping", err, lo.FromPtr(ctl.serverCfg.DetailedErrorsToClient)),
AuthErr: authErr,
})
return
}
ctl.lastPing.Store(time.Now())
xl.Debugf("receive heartbeat")
_ = ctl.msgDispatcher.Send(&msg.Pong{})
}
func (ctl *Control) handleNatHoleVisitor(m msg.Message) {
inMsg := m.(*msg.NatHoleVisitor)
ctl.rc.NatHoleController.HandleVisitor(inMsg, ctl.msgTransporter, ctl.loginMsg.User)
}
func (ctl *Control) handleNatHoleClient(m msg.Message) {
inMsg := m.(*msg.NatHoleClient)
ctl.rc.NatHoleController.HandleClient(inMsg, ctl.msgTransporter)
}
func (ctl *Control) handleNatHoleReport(m msg.Message) {
inMsg := m.(*msg.NatHoleReport)
ctl.rc.NatHoleController.HandleReport(inMsg)
}
func (ctl *Control) handleCloseProxy(m msg.Message) {
xl := ctl.xl
inMsg := m.(*msg.CloseProxy)
_ = ctl.CloseProxy(inMsg)
xl.Infof("close proxy [%s] success", inMsg.ProxyName)
}
func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err error) {
var pxyConf v1.ProxyConfigurer
// Load configures from NewProxy message and validate.
pxyConf, err = config.NewProxyConfigurerFromMsg(pxyMsg, ctl.serverCfg)
if err != nil {
return
}
// User info
userInfo := plugin.UserInfo{
User: ctl.loginMsg.User,
Metas: ctl.loginMsg.Metas,
RunID: ctl.runID,
}
// NewProxy will return an interface Proxy.
// In fact, it creates different proxies based on the proxy type. We just call run() here.
pxy, err := proxy.NewProxy(ctl.ctx, &proxy.Options{
UserInfo: userInfo,
LoginMsg: ctl.loginMsg,
PoolCount: ctl.poolCount,
ResourceController: ctl.rc,
GetWorkConnFn: ctl.GetWorkConn,
Configurer: pxyConf,
ServerCfg: ctl.serverCfg,
})
if err != nil {
return remoteAddr, err
}
// Check ports used number in each client
if ctl.serverCfg.MaxPortsPerClient > 0 {
ctl.mu.Lock()
if ctl.portsUsedNum+pxy.GetUsedPortsNum() > int(ctl.serverCfg.MaxPortsPerClient) {
ctl.mu.Unlock()
err = fmt.Errorf("exceed the max_ports_per_client")
return
}
ctl.portsUsedNum += pxy.GetUsedPortsNum()
ctl.mu.Unlock()
defer func() {
if err != nil {
ctl.mu.Lock()
ctl.portsUsedNum -= pxy.GetUsedPortsNum()
ctl.mu.Unlock()
}
}()
}
if ctl.pxyManager.Exist(pxyMsg.ProxyName) {
err = fmt.Errorf("proxy [%s] already exists", pxyMsg.ProxyName)
return
}
remoteAddr, err = pxy.Run()
if err != nil {
return
}
defer func() {
if err != nil {
pxy.Close()
}
}()
err = ctl.pxyManager.Add(pxyMsg.ProxyName, pxy)
if err != nil {
return
}
ctl.mu.Lock()
ctl.proxies[pxy.GetName()] = pxy
ctl.mu.Unlock()
return
}
func (ctl *Control) CloseProxy(closeMsg *msg.CloseProxy) (err error) {
ctl.mu.Lock()
pxy, ok := ctl.proxies[closeMsg.ProxyName]
if !ok {
ctl.mu.Unlock()
return
}
if ctl.serverCfg.MaxPortsPerClient > 0 {
ctl.portsUsedNum -= pxy.GetUsedPortsNum()
}
pxy.Close()
ctl.pxyManager.Del(pxy.GetName())
delete(ctl.proxies, closeMsg.ProxyName)
ctl.mu.Unlock()
metrics.Server.CloseProxy(pxy.GetName(), pxy.GetConfigurer().GetBaseConfig().Type)
notifyContent := &plugin.CloseProxyContent{
User: plugin.UserInfo{
User: ctl.loginMsg.User,
Metas: ctl.loginMsg.Metas,
RunID: ctl.loginMsg.RunID,
},
CloseProxy: msg.CloseProxy{
ProxyName: pxy.GetName(),
},
}
go func() {
_ = ctl.pluginManager.CloseProxy(notifyContent)
}()
return
}