add close proxy op

This commit is contained in:
Harry Cheng 2022-03-02 16:02:08 +08:00
parent 19739ed31a
commit 782a7c085a
6 changed files with 108 additions and 1 deletions

View File

@ -70,7 +70,7 @@ The response can look like any of the following:
### Operation ### Operation
Currently `Login`, `NewProxy`, `Ping`, `NewWorkConn` and `NewUserConn` operations are supported. Currently `Login`, `NewProxy`, `CloseProxy`, `Ping`, `NewWorkConn` and `NewUserConn` operations are supported.
#### Login #### Login
@ -136,6 +136,23 @@ Create new proxy
} }
``` ```
#### CloseProxy
A previously created proxy is closed
```
{
"content": {
"user": {
"user": <string>,
"metas": map<string>string
"run_id": <string>
},
"proxy_name": <string>
}
}
```
#### Ping #### Ping
Heartbeat from frpc Heartbeat from frpc

View File

@ -26,6 +26,7 @@ import (
type Manager struct { type Manager struct {
loginPlugins []Plugin loginPlugins []Plugin
newProxyPlugins []Plugin newProxyPlugins []Plugin
closeProxyPlugins []Plugin
pingPlugins []Plugin pingPlugins []Plugin
newWorkConnPlugins []Plugin newWorkConnPlugins []Plugin
newUserConnPlugins []Plugin newUserConnPlugins []Plugin
@ -35,6 +36,7 @@ func NewManager() *Manager {
return &Manager{ return &Manager{
loginPlugins: make([]Plugin, 0), loginPlugins: make([]Plugin, 0),
newProxyPlugins: make([]Plugin, 0), newProxyPlugins: make([]Plugin, 0),
closeProxyPlugins: make([]Plugin, 0),
pingPlugins: make([]Plugin, 0), pingPlugins: make([]Plugin, 0),
newWorkConnPlugins: make([]Plugin, 0), newWorkConnPlugins: make([]Plugin, 0),
newUserConnPlugins: make([]Plugin, 0), newUserConnPlugins: make([]Plugin, 0),
@ -48,6 +50,9 @@ func (m *Manager) Register(p Plugin) {
if p.IsSupport(OpNewProxy) { if p.IsSupport(OpNewProxy) {
m.newProxyPlugins = append(m.newProxyPlugins, p) m.newProxyPlugins = append(m.newProxyPlugins, p)
} }
if p.IsSupport(OpCloseProxy) {
m.closeProxyPlugins = append(m.closeProxyPlugins, p)
}
if p.IsSupport(OpPing) { if p.IsSupport(OpPing) {
m.pingPlugins = append(m.pingPlugins, p) m.pingPlugins = append(m.pingPlugins, p)
} }
@ -127,6 +132,27 @@ func (m *Manager) NewProxy(content *NewProxyContent) (*NewProxyContent, error) {
return content, nil return content, nil
} }
func (m *Manager) CloseProxy(content *CloseProxyContent) {
if len(m.closeProxyPlugins) == 0 {
return
}
var (
err error
)
reqid, _ := util.RandID()
xl := xlog.New().AppendPrefix("reqid: " + reqid)
ctx := xlog.NewContext(context.Background(), xl)
ctx = NewReqidContext(ctx, reqid)
for _, p := range m.closeProxyPlugins {
_, _, err = p.Handle(ctx, OpCloseProxy, *content)
if err != nil {
xl.Warn("send CloseProxy request to plugin [%s] error: %v", p.Name(), err)
}
}
}
func (m *Manager) Ping(content *PingContent) (*PingContent, error) { func (m *Manager) Ping(content *PingContent) (*PingContent, error) {
if len(m.pingPlugins) == 0 { if len(m.pingPlugins) == 0 {
return content, nil return content, nil

View File

@ -23,6 +23,7 @@ const (
OpLogin = "Login" OpLogin = "Login"
OpNewProxy = "NewProxy" OpNewProxy = "NewProxy"
OpCloseProxy = "CloseProxy"
OpPing = "Ping" OpPing = "Ping"
OpNewWorkConn = "NewWorkConn" OpNewWorkConn = "NewWorkConn"
OpNewUserConn = "NewUserConn" OpNewUserConn = "NewUserConn"

View File

@ -48,6 +48,11 @@ type NewProxyContent struct {
msg.NewProxy msg.NewProxy
} }
type CloseProxyContent struct {
User UserInfo `json:"user"`
msg.CloseProxy
}
type PingContent struct { type PingContent struct {
User UserInfo `json:"user"` User UserInfo `json:"user"`
msg.Ping msg.Ping

View File

@ -453,6 +453,17 @@ func (ctl *Control) manager() {
} }
ctl.sendCh <- resp ctl.sendCh <- resp
case *msg.CloseProxy: case *msg.CloseProxy:
content := &plugin.CloseProxyContent{
User: plugin.UserInfo{
User: ctl.loginMsg.User,
Metas: ctl.loginMsg.Metas,
RunID: ctl.loginMsg.RunID,
},
CloseProxy: *m,
}
ctl.pluginManager.CloseProxy(content)
ctl.CloseProxy(m) ctl.CloseProxy(m)
xl.Info("close proxy [%s] success", m.ProxyName) xl.Info("close proxy [%s] success", m.ProxyName)
case *msg.Ping: case *msg.Ping:

View File

@ -158,6 +158,53 @@ var _ = Describe("[Feature: Server-Plugins]", func() {
}) })
}) })
Describe("CloseProxy", func() {
newFunc := func() *plugin.Request {
var r plugin.Request
r.Content = &plugin.CloseProxyContent{}
return &r
}
It("Validate Info", func() {
localPort := f.AllocPort()
handler := func(req *plugin.Request) *plugin.Response {
var ret plugin.Response
content := req.Content.(*plugin.CloseProxyContent)
if content.ProxyName == "tcp" {
ret.Unchange = true
} else {
ret.Reject = true
}
return &ret
}
pluginServer := NewHTTPPluginServer(localPort, newFunc, handler, nil)
f.RunServer("", pluginServer)
serverConf := consts.DefaultServerConfig + fmt.Sprintf(`
[plugin.test]
addr = 127.0.0.1:%d
path = /handler
ops = CloseProxy
`, localPort)
clientConf := consts.DefaultClientConfig
remotePort := f.AllocPort()
clientConf += fmt.Sprintf(`
[tcp]
type = tcp
local_port = {{ .%s }}
remote_port = %d
`, framework.TCPEchoServerPort, remotePort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
time.Sleep(5 * time.Second)
framework.NewRequestExpect(f).Port(remotePort).Ensure()
})
})
Describe("Ping", func() { Describe("Ping", func() {
newFunc := func() *plugin.Request { newFunc := func() *plugin.Request {
var r plugin.Request var r plugin.Request