From 782a7c085ab673c5ada932fb92f4bd8fb13b3e3e Mon Sep 17 00:00:00 2001 From: Harry Cheng Date: Wed, 2 Mar 2022 16:02:08 +0800 Subject: [PATCH] add close proxy op --- doc/server_plugin.md | 19 ++++++++++++++- pkg/plugin/server/manager.go | 26 ++++++++++++++++++++ pkg/plugin/server/plugin.go | 1 + pkg/plugin/server/types.go | 5 ++++ server/control.go | 11 +++++++++ test/e2e/plugin/server.go | 47 ++++++++++++++++++++++++++++++++++++ 6 files changed, 108 insertions(+), 1 deletion(-) diff --git a/doc/server_plugin.md b/doc/server_plugin.md index 3697053b..9506fa7b 100644 --- a/doc/server_plugin.md +++ b/doc/server_plugin.md @@ -70,7 +70,7 @@ The response can look like any of the following: ### Operation -Currently `Login`, `NewProxy`, `Ping`, `NewWorkConn` and `NewUserConn` operations are supported. +Currently `Login`, `NewProxy`, `CloseProxy`, `Ping`, `NewWorkConn` and `NewUserConn` operations are supported. #### Login @@ -136,6 +136,23 @@ Create new proxy } ``` +#### CloseProxy + +A previously created proxy is closed + +``` +{ + "content": { + "user": { + "user": , + "metas": mapstring + "run_id": + }, + "proxy_name": + } +} +``` + #### Ping Heartbeat from frpc diff --git a/pkg/plugin/server/manager.go b/pkg/plugin/server/manager.go index bc882889..55f32f8e 100644 --- a/pkg/plugin/server/manager.go +++ b/pkg/plugin/server/manager.go @@ -26,6 +26,7 @@ import ( type Manager struct { loginPlugins []Plugin newProxyPlugins []Plugin + closeProxyPlugins []Plugin pingPlugins []Plugin newWorkConnPlugins []Plugin newUserConnPlugins []Plugin @@ -35,6 +36,7 @@ func NewManager() *Manager { return &Manager{ loginPlugins: make([]Plugin, 0), newProxyPlugins: make([]Plugin, 0), + closeProxyPlugins: make([]Plugin, 0), pingPlugins: make([]Plugin, 0), newWorkConnPlugins: make([]Plugin, 0), newUserConnPlugins: make([]Plugin, 0), @@ -48,6 +50,9 @@ func (m *Manager) Register(p Plugin) { if p.IsSupport(OpNewProxy) { m.newProxyPlugins = append(m.newProxyPlugins, p) } + if p.IsSupport(OpCloseProxy) { + m.closeProxyPlugins = append(m.closeProxyPlugins, p) + } if p.IsSupport(OpPing) { m.pingPlugins = append(m.pingPlugins, p) } @@ -127,6 +132,27 @@ func (m *Manager) NewProxy(content *NewProxyContent) (*NewProxyContent, error) { return content, nil } +func (m *Manager) CloseProxy(content *CloseProxyContent) { + if len(m.closeProxyPlugins) == 0 { + return + } + + var ( + err error + ) + reqid, _ := util.RandID() + xl := xlog.New().AppendPrefix("reqid: " + reqid) + ctx := xlog.NewContext(context.Background(), xl) + ctx = NewReqidContext(ctx, reqid) + + for _, p := range m.closeProxyPlugins { + _, _, err = p.Handle(ctx, OpCloseProxy, *content) + if err != nil { + xl.Warn("send CloseProxy request to plugin [%s] error: %v", p.Name(), err) + } + } +} + func (m *Manager) Ping(content *PingContent) (*PingContent, error) { if len(m.pingPlugins) == 0 { return content, nil diff --git a/pkg/plugin/server/plugin.go b/pkg/plugin/server/plugin.go index 160d12a2..0d34de54 100644 --- a/pkg/plugin/server/plugin.go +++ b/pkg/plugin/server/plugin.go @@ -23,6 +23,7 @@ const ( OpLogin = "Login" OpNewProxy = "NewProxy" + OpCloseProxy = "CloseProxy" OpPing = "Ping" OpNewWorkConn = "NewWorkConn" OpNewUserConn = "NewUserConn" diff --git a/pkg/plugin/server/types.go b/pkg/plugin/server/types.go index 4df79f46..d7d98cb6 100644 --- a/pkg/plugin/server/types.go +++ b/pkg/plugin/server/types.go @@ -48,6 +48,11 @@ type NewProxyContent struct { msg.NewProxy } +type CloseProxyContent struct { + User UserInfo `json:"user"` + msg.CloseProxy +} + type PingContent struct { User UserInfo `json:"user"` msg.Ping diff --git a/server/control.go b/server/control.go index 25adc2d2..4ffb0fa2 100644 --- a/server/control.go +++ b/server/control.go @@ -453,6 +453,17 @@ func (ctl *Control) manager() { } ctl.sendCh <- resp case *msg.CloseProxy: + content := &plugin.CloseProxyContent{ + User: plugin.UserInfo{ + User: ctl.loginMsg.User, + Metas: ctl.loginMsg.Metas, + RunID: ctl.loginMsg.RunID, + }, + CloseProxy: *m, + } + + ctl.pluginManager.CloseProxy(content) + ctl.CloseProxy(m) xl.Info("close proxy [%s] success", m.ProxyName) case *msg.Ping: diff --git a/test/e2e/plugin/server.go b/test/e2e/plugin/server.go index 79ecff44..57456fc4 100644 --- a/test/e2e/plugin/server.go +++ b/test/e2e/plugin/server.go @@ -158,6 +158,53 @@ var _ = Describe("[Feature: Server-Plugins]", func() { }) }) + Describe("CloseProxy", func() { + newFunc := func() *plugin.Request { + var r plugin.Request + r.Content = &plugin.CloseProxyContent{} + return &r + } + + It("Validate Info", func() { + localPort := f.AllocPort() + handler := func(req *plugin.Request) *plugin.Response { + var ret plugin.Response + content := req.Content.(*plugin.CloseProxyContent) + if content.ProxyName == "tcp" { + ret.Unchange = true + } else { + ret.Reject = true + } + return &ret + } + pluginServer := NewHTTPPluginServer(localPort, newFunc, handler, nil) + + f.RunServer("", pluginServer) + + serverConf := consts.DefaultServerConfig + fmt.Sprintf(` + [plugin.test] + addr = 127.0.0.1:%d + path = /handler + ops = CloseProxy + `, localPort) + clientConf := consts.DefaultClientConfig + + remotePort := f.AllocPort() + clientConf += fmt.Sprintf(` + [tcp] + type = tcp + local_port = {{ .%s }} + remote_port = %d + `, framework.TCPEchoServerPort, remotePort) + + f.RunProcesses([]string{serverConf}, []string{clientConf}) + + time.Sleep(5 * time.Second) + + framework.NewRequestExpect(f).Port(remotePort).Ensure() + }) + }) + Describe("Ping", func() { newFunc := func() *plugin.Request { var r plugin.Request