From 8428c2c01ff59f6389be3969338306c641ffa6a1 Mon Sep 17 00:00:00 2001 From: blizard863 <760076784@qq.com> Date: Sun, 8 Aug 2021 23:46:49 +0800 Subject: [PATCH] feat: http stream filter --- client/admin.go | 7 ++ client/http_filter_api.go | 87 ++++++++++++++++ client/proxy/proxy.go | 84 ++++++++++++++-- go.mod | 1 + go.sum | 2 + pkg/config/proxy.go | 3 + pkg/plugin/client/http2https.go | 13 ++- pkg/plugin/client/http_proxy.go | 19 +++- pkg/plugin/client/https2http.go | 15 ++- pkg/plugin/client/https2https.go | 13 ++- pkg/plugin/client/plugin.go | 44 -------- pkg/plugin/client/static_file.go | 5 +- pkg/plugin/filter/http_proxy_filter.go | 83 +++++++++++++++ pkg/plugin/interceptor/cache_interceprot.go | 106 ++++++++++++++++++++ pkg/plugin/interceptor/transport.go | 49 +++++++++ pkg/util/cache/cache.go | 26 +++++ pkg/util/listener/listener.go | 50 +++++++++ 17 files changed, 543 insertions(+), 64 deletions(-) create mode 100644 client/http_filter_api.go create mode 100644 pkg/plugin/filter/http_proxy_filter.go create mode 100644 pkg/plugin/interceptor/cache_interceprot.go create mode 100644 pkg/plugin/interceptor/transport.go create mode 100644 pkg/util/cache/cache.go create mode 100644 pkg/util/listener/listener.go diff --git a/client/admin.go b/client/admin.go index 364b3105..e5075f35 100644 --- a/client/admin.go +++ b/client/admin.go @@ -51,6 +51,13 @@ func (svr *Service) RunAdminServer(address string) (err error) { http.Redirect(w, r, "/static/", http.StatusMovedPermanently) }) + // http filter + router.HandleFunc("/api/http/list", svr.httpFilterList).Methods("GET") + router.HandleFunc("/api/http/raw_data/{id}", svr.httpFilterGetRaw).Methods("GET") + router.HandleFunc("/api/http/replay/{id}", svr.httpFilterReplay).Methods("PUT") + router.HandleFunc("/api/http/remove/{id}", svr.httpFilterRemove).Methods("DELETE") + router.HandleFunc("/api/http/clear", svr.httpFilterClear).Methods("PUT") + server := &http.Server{ Addr: address, Handler: router, diff --git a/client/http_filter_api.go b/client/http_filter_api.go new file mode 100644 index 00000000..25628f7f --- /dev/null +++ b/client/http_filter_api.go @@ -0,0 +1,87 @@ +package client + +import ( + "bytes" + "encoding/json" + "io/ioutil" + "net/http" + + "github.com/fatedier/frp/pkg/plugin/interceptor" + "github.com/fatedier/frp/pkg/util/cache" + "github.com/fatedier/frp/pkg/util/log" + "github.com/gorilla/mux" +) + +func (svr *Service) httpFilterList(w http.ResponseWriter, r *http.Request) { + keys := cache.DefaultCache.Keys() + if len(keys) <= 0 { + log.Debug("there is no http stream be cached") + w.WriteHeader(200) + return + } + + data, _ := json.Marshal(keys) + + w.WriteHeader(200) + w.Write(data) +} + +func (svr *Service) httpFilterGetRaw(w http.ResponseWriter, r *http.Request) { + id := mux.Vars(r)["id"] + dataRaw, ok := cache.DefaultCache.Get(id) + if !ok { + log.Warn("key not found: %v", id) + w.WriteHeader(404) + return + } + data, _ := json.Marshal(dataRaw) + + w.WriteHeader(200) + w.Write(data) +} + +func (svr *Service) httpFilterReplay(w http.ResponseWriter, r *http.Request) { + id := mux.Vars(r)["id"] + dataRaw, ok := cache.DefaultCache.Get(id) + if !ok { + log.Warn("key not found: %v", id) + w.WriteHeader(404) + return + } + + pair, ok := dataRaw.(interceptor.Pair) + if !ok { + log.Warn("data type not match") + w.WriteHeader(500) + return + } + + req, _ := http.NewRequest(pair.Req.Method, pair.Req.URL, bytes.NewBuffer(pair.Req.Body)) + req.Header = pair.Req.Header + req.Host = pair.Req.Host + + resp, err := http.DefaultClient.Do(req) + if err != nil { + log.Warn("replay request: %v got error: %v", pair.Req, err) + w.WriteHeader(500) + return + } + defer resp.Body.Close() + + data, _ := ioutil.ReadAll(resp.Body) + log.Debug("replay get response body: %v", string(data)) + + w.WriteHeader(200) +} + +func (svr *Service) httpFilterRemove(w http.ResponseWriter, r *http.Request) { + id := mux.Vars(r)["id"] + cache.DefaultCache.Remove(id) + + w.WriteHeader(200) +} + +func (svr *Service) httpFilterClear(w http.ResponseWriter, r *http.Request) { + cache.DefaultCache.Purge() + w.WriteHeader(200) +} diff --git a/client/proxy/proxy.go b/client/proxy/proxy.go index 47ab03ca..c833b044 100644 --- a/client/proxy/proxy.go +++ b/client/proxy/proxy.go @@ -28,6 +28,7 @@ import ( "github.com/fatedier/frp/pkg/config" "github.com/fatedier/frp/pkg/msg" plugin "github.com/fatedier/frp/pkg/plugin/client" + "github.com/fatedier/frp/pkg/plugin/filter" "github.com/fatedier/frp/pkg/proto/udp" "github.com/fatedier/frp/pkg/util/limit" frpNet "github.com/fatedier/frp/pkg/util/net" @@ -128,6 +129,8 @@ type TCPProxy struct { cfg *config.TCPProxyConf proxyPlugin plugin.Plugin + + streamFilter *filter.HTTPStreamFilter } func (pxy *TCPProxy) Run() (err error) { @@ -137,6 +140,14 @@ func (pxy *TCPProxy) Run() (err error) { return } } + + if pxy.cfg.StreamFilterType != "" { + pxy.streamFilter, err = filter.NewHTTPStreamFilter(pxy.cfg.LocalSvrConf, pxy.cfg.StreamFilterParams) + if err != nil { + return + } + } + return } @@ -148,7 +159,7 @@ func (pxy *TCPProxy) Close() { func (pxy *TCPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter, - conn, []byte(pxy.clientCfg.Token), m) + conn, []byte(pxy.clientCfg.Token), m, pxy.streamFilter) } // TCP Multiplexer @@ -157,6 +168,8 @@ type TCPMuxProxy struct { cfg *config.TCPMuxProxyConf proxyPlugin plugin.Plugin + + streamFilter *filter.HTTPStreamFilter } func (pxy *TCPMuxProxy) Run() (err error) { @@ -166,6 +179,14 @@ func (pxy *TCPMuxProxy) Run() (err error) { return } } + + if pxy.cfg.StreamFilterType != "" { + pxy.streamFilter, err = filter.NewHTTPStreamFilter(pxy.cfg.LocalSvrConf, pxy.cfg.StreamFilterParams) + if err != nil { + return + } + } + return } @@ -177,7 +198,7 @@ func (pxy *TCPMuxProxy) Close() { func (pxy *TCPMuxProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter, - conn, []byte(pxy.clientCfg.Token), m) + conn, []byte(pxy.clientCfg.Token), m, pxy.streamFilter) } // HTTP @@ -186,6 +207,8 @@ type HTTPProxy struct { cfg *config.HTTPProxyConf proxyPlugin plugin.Plugin + + streamFilter *filter.HTTPStreamFilter } func (pxy *HTTPProxy) Run() (err error) { @@ -195,6 +218,14 @@ func (pxy *HTTPProxy) Run() (err error) { return } } + + if pxy.cfg.StreamFilterType != "" { + pxy.streamFilter, err = filter.NewHTTPStreamFilter(pxy.cfg.LocalSvrConf, pxy.cfg.StreamFilterParams) + if err != nil { + return + } + } + return } @@ -206,7 +237,7 @@ func (pxy *HTTPProxy) Close() { func (pxy *HTTPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter, - conn, []byte(pxy.clientCfg.Token), m) + conn, []byte(pxy.clientCfg.Token), m, pxy.streamFilter) } // HTTPS @@ -215,6 +246,8 @@ type HTTPSProxy struct { cfg *config.HTTPSProxyConf proxyPlugin plugin.Plugin + + streamFilter *filter.HTTPStreamFilter } func (pxy *HTTPSProxy) Run() (err error) { @@ -224,6 +257,14 @@ func (pxy *HTTPSProxy) Run() (err error) { return } } + + if pxy.cfg.StreamFilterType != "" { + pxy.streamFilter, err = filter.NewHTTPStreamFilter(pxy.cfg.LocalSvrConf, pxy.cfg.StreamFilterParams) + if err != nil { + return + } + } + return } @@ -235,7 +276,7 @@ func (pxy *HTTPSProxy) Close() { func (pxy *HTTPSProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter, - conn, []byte(pxy.clientCfg.Token), m) + conn, []byte(pxy.clientCfg.Token), m, pxy.streamFilter) } // STCP @@ -244,6 +285,8 @@ type STCPProxy struct { cfg *config.STCPProxyConf proxyPlugin plugin.Plugin + + streamFilter *filter.HTTPStreamFilter } func (pxy *STCPProxy) Run() (err error) { @@ -253,6 +296,14 @@ func (pxy *STCPProxy) Run() (err error) { return } } + + if pxy.cfg.StreamFilterType != "" { + pxy.streamFilter, err = filter.NewHTTPStreamFilter(pxy.cfg.LocalSvrConf, pxy.cfg.StreamFilterParams) + if err != nil { + return + } + } + return } @@ -264,7 +315,7 @@ func (pxy *STCPProxy) Close() { func (pxy *STCPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter, - conn, []byte(pxy.clientCfg.Token), m) + conn, []byte(pxy.clientCfg.Token), m, pxy.streamFilter) } // XTCP @@ -273,6 +324,8 @@ type XTCPProxy struct { cfg *config.XTCPProxyConf proxyPlugin plugin.Plugin + + streamFilter *filter.HTTPStreamFilter } func (pxy *XTCPProxy) Run() (err error) { @@ -282,6 +335,14 @@ func (pxy *XTCPProxy) Run() (err error) { return } } + + if pxy.cfg.StreamFilterType != "" { + pxy.streamFilter, err = filter.NewHTTPStreamFilter(pxy.cfg.LocalSvrConf, pxy.cfg.StreamFilterParams) + if err != nil { + return + } + } + return } @@ -414,7 +475,7 @@ func (pxy *XTCPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { } HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter, - muxConn, []byte(pxy.cfg.Sk), m) + muxConn, []byte(pxy.cfg.Sk), m, pxy.streamFilter) } func (pxy *XTCPProxy) sendDetectMsg(addr string, port int, laddr *net.UDPAddr, content []byte) (err error) { @@ -722,7 +783,9 @@ func (pxy *SUDPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { // Common handler for tcp work connections. func HandleTCPWorkConnection(ctx context.Context, localInfo *config.LocalSvrConf, proxyPlugin plugin.Plugin, - baseInfo *config.BaseProxyConf, limiter *rate.Limiter, workConn net.Conn, encKey []byte, m *msg.StartWorkConn) { + baseInfo *config.BaseProxyConf, limiter *rate.Limiter, workConn net.Conn, encKey []byte, m *msg.StartWorkConn, + streamFilter *filter.HTTPStreamFilter) { + xl := xlog.FromContextSafe(ctx) var ( remote io.ReadWriteCloser @@ -790,6 +853,13 @@ func HandleTCPWorkConnection(ctx context.Context, localInfo *config.LocalSvrConf return } + if streamFilter != nil { + xl.Debug("handle by filter") + streamFilter.Handle(remote, workConn) + xl.Debug("handle by filter finish") + return + } + localConn, err := frpNet.ConnectServer("tcp", fmt.Sprintf("%s:%d", localInfo.LocalIP, localInfo.LocalPort)) if err != nil { workConn.Close() diff --git a/go.mod b/go.mod index 938089c7..d0867a64 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/google/uuid v1.2.0 github.com/gorilla/mux v1.8.0 github.com/gorilla/websocket v1.4.2 + github.com/hashicorp/golang-lru v0.5.4 github.com/hashicorp/yamux v0.0.0-20210707203944-259a57b3608c github.com/klauspost/cpuid v1.2.0 // indirect github.com/klauspost/reedsolomon v1.9.1 // indirect diff --git a/go.sum b/go.sum index de5929c8..8c5d68cb 100644 --- a/go.sum +++ b/go.sum @@ -205,6 +205,8 @@ github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= +github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ= diff --git a/pkg/config/proxy.go b/pkg/config/proxy.go index c000bb30..a6bbf198 100644 --- a/pkg/config/proxy.go +++ b/pkg/config/proxy.go @@ -73,6 +73,9 @@ type LocalSvrConf struct { // PluginParams specify parameters to be passed to the plugin, if one is // being used. By default, this value is an empty map. PluginParams map[string]string `ini:"-"` + + StreamFilterType string `ini:"stream_filter_type" json:"stream_filter_type"` + StreamFilterParams map[string]string `ini:"-"` } // HealthCheckConf configures health checking. This can be useful for load diff --git a/pkg/plugin/client/http2https.go b/pkg/plugin/client/http2https.go index 3074cddf..a4fb9288 100644 --- a/pkg/plugin/client/http2https.go +++ b/pkg/plugin/client/http2https.go @@ -23,6 +23,8 @@ import ( "net/http/httputil" "strings" + "github.com/fatedier/frp/pkg/plugin/interceptor" + "github.com/fatedier/frp/pkg/util/listener" frpNet "github.com/fatedier/frp/pkg/util/net" ) @@ -37,13 +39,15 @@ type HTTP2HTTPSPlugin struct { localAddr string headers map[string]string - l *Listener + l *listener.Listener s *http.Server } func NewHTTP2HTTPSPlugin(params map[string]string) (Plugin, error) { localAddr := params["plugin_local_addr"] hostHeaderRewrite := params["plugin_host_header_rewrite"] + filter := params["stream_filter_type"] + headers := make(map[string]string) for k, v := range params { if !strings.HasPrefix(k, "plugin_header_") { @@ -58,7 +62,7 @@ func NewHTTP2HTTPSPlugin(params map[string]string) (Plugin, error) { return nil, fmt.Errorf("plugin_local_addr is required") } - listener := NewProxyListener() + listener := listener.NewProxyListener() p := &HTTP2HTTPSPlugin{ localAddr: localAddr, @@ -67,8 +71,11 @@ func NewHTTP2HTTPSPlugin(params map[string]string) (Plugin, error) { l: listener, } - tr := &http.Transport{ + tr := interceptor.NewTransportWrapper(&http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }) + if filter == "http" { + tr.WithCacheInterceptor() } rp := &httputil.ReverseProxy{ diff --git a/pkg/plugin/client/http_proxy.go b/pkg/plugin/client/http_proxy.go index 45bd3a9a..69313b08 100644 --- a/pkg/plugin/client/http_proxy.go +++ b/pkg/plugin/client/http_proxy.go @@ -22,6 +22,8 @@ import ( "net/http" "strings" + "github.com/fatedier/frp/pkg/plugin/interceptor" + "github.com/fatedier/frp/pkg/util/listener" frpNet "github.com/fatedier/frp/pkg/util/net" frpIo "github.com/fatedier/golib/io" @@ -35,21 +37,32 @@ func init() { } type HTTPProxy struct { - l *Listener + l *listener.Listener s *http.Server AuthUser string AuthPasswd string + + Transport http.RoundTripper } func NewHTTPProxyPlugin(params map[string]string) (Plugin, error) { user := params["plugin_http_user"] passwd := params["plugin_http_passwd"] - listener := NewProxyListener() + filter := params["stream_filter_type"] + + listener := listener.NewProxyListener() + + tr := interceptor.NewTransportWrapper(http.DefaultTransport) + if filter == "http" { + tr.WithCacheInterceptor() + } hp := &HTTPProxy{ l: listener, AuthUser: user, AuthPasswd: passwd, + + Transport: tr, } hp.s = &http.Server{ @@ -115,7 +128,7 @@ func (hp *HTTPProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { func (hp *HTTPProxy) HTTPHandler(rw http.ResponseWriter, req *http.Request) { removeProxyHeaders(req) - resp, err := http.DefaultTransport.RoundTrip(req) + resp, err := hp.Transport.RoundTrip(req) if err != nil { http.Error(rw, err.Error(), http.StatusInternalServerError) return diff --git a/pkg/plugin/client/https2http.go b/pkg/plugin/client/https2http.go index 81806e39..e45c4762 100644 --- a/pkg/plugin/client/https2http.go +++ b/pkg/plugin/client/https2http.go @@ -23,6 +23,8 @@ import ( "net/http/httputil" "strings" + "github.com/fatedier/frp/pkg/plugin/interceptor" + "github.com/fatedier/frp/pkg/util/listener" frpNet "github.com/fatedier/frp/pkg/util/net" ) @@ -39,7 +41,7 @@ type HTTPS2HTTPPlugin struct { localAddr string headers map[string]string - l *Listener + l *listener.Listener s *http.Server } @@ -68,7 +70,9 @@ func NewHTTPS2HTTPPlugin(params map[string]string) (Plugin, error) { return nil, fmt.Errorf("plugin_local_addr is required") } - listener := NewProxyListener() + filter := params["stream_filter_type"] + + listener := listener.NewProxyListener() p := &HTTPS2HTTPPlugin{ crtPath: crtPath, @@ -79,6 +83,11 @@ func NewHTTPS2HTTPPlugin(params map[string]string) (Plugin, error) { l: listener, } + tr := interceptor.NewTransportWrapper(http.DefaultTransport) + if filter == "http" { + tr.WithCacheInterceptor() + } + rp := &httputil.ReverseProxy{ Director: func(req *http.Request) { req.URL.Scheme = "http" @@ -90,6 +99,8 @@ func NewHTTPS2HTTPPlugin(params map[string]string) (Plugin, error) { req.Header.Set(k, v) } }, + + Transport: tr, } p.s = &http.Server{ diff --git a/pkg/plugin/client/https2https.go b/pkg/plugin/client/https2https.go index 159ed398..4c98f166 100644 --- a/pkg/plugin/client/https2https.go +++ b/pkg/plugin/client/https2https.go @@ -23,6 +23,8 @@ import ( "net/http/httputil" "strings" + "github.com/fatedier/frp/pkg/plugin/interceptor" + "github.com/fatedier/frp/pkg/util/listener" frpNet "github.com/fatedier/frp/pkg/util/net" ) @@ -39,7 +41,7 @@ type HTTPS2HTTPSPlugin struct { localAddr string headers map[string]string - l *Listener + l *listener.Listener s *http.Server } @@ -68,7 +70,9 @@ func NewHTTPS2HTTPSPlugin(params map[string]string) (Plugin, error) { return nil, fmt.Errorf("plugin_local_addr is required") } - listener := NewProxyListener() + filter := params["stream_filter_type"] + + listener := listener.NewProxyListener() p := &HTTPS2HTTPSPlugin{ crtPath: crtPath, @@ -79,8 +83,11 @@ func NewHTTPS2HTTPSPlugin(params map[string]string) (Plugin, error) { l: listener, } - tr := &http.Transport{ + tr := interceptor.NewTransportWrapper(&http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }) + if filter == "http" { + tr.WithCacheInterceptor() } rp := &httputil.ReverseProxy{ diff --git a/pkg/plugin/client/plugin.go b/pkg/plugin/client/plugin.go index 6850919a..731abc39 100644 --- a/pkg/plugin/client/plugin.go +++ b/pkg/plugin/client/plugin.go @@ -18,9 +18,6 @@ import ( "fmt" "io" "net" - "sync" - - "github.com/fatedier/golib/errors" ) // Creators is used for create plugins to handle connections. @@ -49,44 +46,3 @@ type Plugin interface { Handle(conn io.ReadWriteCloser, realConn net.Conn, extraBufToLocal []byte) Close() error } - -type Listener struct { - conns chan net.Conn - closed bool - mu sync.Mutex -} - -func NewProxyListener() *Listener { - return &Listener{ - conns: make(chan net.Conn, 64), - } -} - -func (l *Listener) Accept() (net.Conn, error) { - conn, ok := <-l.conns - if !ok { - return nil, fmt.Errorf("listener closed") - } - return conn, nil -} - -func (l *Listener) PutConn(conn net.Conn) error { - err := errors.PanicToError(func() { - l.conns <- conn - }) - return err -} - -func (l *Listener) Close() error { - l.mu.Lock() - defer l.mu.Unlock() - if !l.closed { - close(l.conns) - l.closed = true - } - return nil -} - -func (l *Listener) Addr() net.Addr { - return (*net.TCPAddr)(nil) -} diff --git a/pkg/plugin/client/static_file.go b/pkg/plugin/client/static_file.go index 1eeea3ba..8c96eaaf 100644 --- a/pkg/plugin/client/static_file.go +++ b/pkg/plugin/client/static_file.go @@ -19,6 +19,7 @@ import ( "net" "net/http" + "github.com/fatedier/frp/pkg/util/listener" frpNet "github.com/fatedier/frp/pkg/util/net" "github.com/gorilla/mux" @@ -36,7 +37,7 @@ type StaticFilePlugin struct { httpUser string httpPasswd string - l *Listener + l *listener.Listener s *http.Server } @@ -46,7 +47,7 @@ func NewStaticFilePlugin(params map[string]string) (Plugin, error) { httpUser := params["plugin_http_user"] httpPasswd := params["plugin_http_passwd"] - listener := NewProxyListener() + listener := listener.NewProxyListener() sp := &StaticFilePlugin{ localPath: localPath, diff --git a/pkg/plugin/filter/http_proxy_filter.go b/pkg/plugin/filter/http_proxy_filter.go new file mode 100644 index 00000000..910e860d --- /dev/null +++ b/pkg/plugin/filter/http_proxy_filter.go @@ -0,0 +1,83 @@ +package filter + +import ( + "context" + "crypto/tls" + "fmt" + "io" + "net" + "net/http" + "net/http/httputil" + "time" + + "github.com/fatedier/frp/pkg/config" + "github.com/fatedier/frp/pkg/plugin/interceptor" + "github.com/fatedier/frp/pkg/util/listener" + frpNet "github.com/fatedier/frp/pkg/util/net" +) + +const ( + StreamFilterName = "StreamFilterName" +) + +type HTTPStreamFilter struct { + l *listener.Listener + s *http.Server +} + +func NewHTTPStreamFilter(filterConf config.LocalSvrConf, params map[string]string) (*HTTPStreamFilter, error) { + listener := listener.NewProxyListener() + + p := &HTTPStreamFilter{ + l: listener, + } + + localAddr := fmt.Sprintf("%v:%v", filterConf.LocalIP, filterConf.LocalPort) + + tr := interceptor.NewTransportWrapper(&http.Transport{ + DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { + return net.Dial("tcp", localAddr) + }, + ResponseHeaderTimeout: time.Duration(90) * time.Second, + ForceAttemptHTTP2: true, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }) + tr.WithCacheInterceptor() + + rp := &httputil.ReverseProxy{ + Director: func(req *http.Request) { + req.URL.Scheme = "http" + req.URL.Host = localAddr + req.Host = localAddr + }, + Transport: tr, + } + + p.s = &http.Server{ + Handler: rp, + } + + go p.s.Serve(listener) + + return p, nil +} + +func (p *HTTPStreamFilter) Handle(conn io.ReadWriteCloser, realConn net.Conn) { + wrapConn := frpNet.WrapReadWriteCloserToConn(conn, realConn) + p.l.PutConn(wrapConn) +} + +func (p *HTTPStreamFilter) Name() string { + return StreamFilterName +} + +func (p *HTTPStreamFilter) Close() error { + if err := p.s.Close(); err != nil { + return err + } + return nil +} diff --git a/pkg/plugin/interceptor/cache_interceprot.go b/pkg/plugin/interceptor/cache_interceprot.go new file mode 100644 index 00000000..f51aeb2e --- /dev/null +++ b/pkg/plugin/interceptor/cache_interceprot.go @@ -0,0 +1,106 @@ +package interceptor + +import ( + "bytes" + "encoding/json" + "io/ioutil" + "net/http" + "time" + + "github.com/fatedier/frp/pkg/util/cache" + "github.com/fatedier/frp/pkg/util/log" + "github.com/google/uuid" +) + +var ( + defaultCacheDataInterceptor = NewCacheDataInterceptor(cache.DefaultCache) +) + +type CacheDataInterceptor struct { + c cache.Cacher +} + +func NewCacheDataInterceptor(c cache.Cacher) *CacheDataInterceptor { + return &CacheDataInterceptor{ + c: c, + } +} + +func (cc *CacheDataInterceptor) Intercept(req *http.Request, resp *http.Response, elapseMs int) { + var ( + bodyReq []byte + bodyResp []byte + ) + + uniqueId := uuid.NewString() + + if req.Body != nil { + bodyReq, _ = ioutil.ReadAll(req.Body) + req.Body = ioutil.NopCloser(bytes.NewReader(bodyReq)) + } + + log.Debug("get request url: %v, path: %v, host: %v, header: %v, body: %v", req.URL.String(), req.URL.Path, req.Host, req.Header, string(bodyReq)) + rawReq := RawRequest{ + Method: req.Method, + URL: req.URL.String(), + Host: req.Host, + Header: req.Header, + Body: bodyReq, + ContentLength: req.ContentLength, + } + + bodyResp, _ = ioutil.ReadAll(resp.Body) + resp.Body = ioutil.NopCloser(bytes.NewReader(bodyResp)) + + log.Debug("get response header: %v, body: %v, use: %v ms", resp.Header, string(bodyResp), elapseMs) + rawResp := RawResponse{ + StatusCode: resp.StatusCode, + Header: resp.Header, + Body: bodyResp, + ContentLength: resp.ContentLength, + } + + pair := Pair{ + Key: uniqueId, + + Req: rawReq, + Resp: rawResp, + + TrafficCaptureTime: time.Now().Unix(), + + ElapseMs: elapseMs, + } + + pairByte, _ := json.Marshal(pair) + if len(pairByte) <= 1024*1024 { + // limit data size less than 1MB + cc.c.Add(uniqueId, pair) + } +} + +type RawRequest struct { + Method string `json:"method,omitempty"` + URL string `json:"url,omitempty"` + Host string `json:"host,omitempty"` + Header http.Header `json:"header,omitempty"` + Body []byte `json:"body,omitempty"` + ContentLength int64 `json:"content_length,omitempty"` +} + +type RawResponse struct { + StatusCode int `json:"status_code,omitempty"` + Header http.Header `json:"header,omitempty"` + Body []byte `json:"body,omitempty"` + ContentLength int64 `json:"content_length,omitempty"` +} + +type Pair struct { + Key string `json:"key,omitempty"` + + Req RawRequest `json:"req,omitempty"` + Resp RawResponse `json:"resp,omitempty"` + + TrafficCaptureTime int64 `json:"traffic_capture_time,omitempty"` + + ElapseMs int `json:"elapse_ms,omitempty"` +} diff --git a/pkg/plugin/interceptor/transport.go b/pkg/plugin/interceptor/transport.go new file mode 100644 index 00000000..4408f647 --- /dev/null +++ b/pkg/plugin/interceptor/transport.go @@ -0,0 +1,49 @@ +package interceptor + +import ( + "net/http" + "time" +) + +type Interceptor func(req *http.Request, resp *http.Response, elapsMs int) + +type TransportWrapper struct { + tr http.RoundTripper + interceptors []Interceptor +} + +func NewTransportWrapper(tr http.RoundTripper, is ...Interceptor) *TransportWrapper { + tw := &TransportWrapper{ + tr: tr, + } + tw.interceptors = append(tw.interceptors, is...) + + return tw +} + +func (tw *TransportWrapper) WithInterceptors(i ...Interceptor) { + tw.interceptors = append(tw.interceptors, i...) +} + +func (tw *TransportWrapper) WithCacheInterceptor() { + tw.WithInterceptors(defaultCacheDataInterceptor.Intercept) +} + +func (tw *TransportWrapper) RoundTrip(req *http.Request) (*http.Response, error) { + startTime := time.Now() + + resp, err := tw.tr.RoundTrip(req) + if err != nil { + return resp, err + } + + elapseMs := time.Since(startTime).Milliseconds() + + go func() { + for _, op := range tw.interceptors { + op(req, resp, int(elapseMs)) + } + }() + + return resp, err +} diff --git a/pkg/util/cache/cache.go b/pkg/util/cache/cache.go new file mode 100644 index 00000000..6096bf50 --- /dev/null +++ b/pkg/util/cache/cache.go @@ -0,0 +1,26 @@ +package cache + +import ( + lru "github.com/hashicorp/golang-lru" +) + +type Cacher interface { + Add(key, value interface{}) (evicted bool) + Contains(key interface{}) bool + Get(key interface{}) (value interface{}, ok bool) + Keys() []interface{} + Len() int + Purge() + Remove(key interface{}) bool +} + +type lruCache struct { + *lru.Cache +} + +func NewCache(size int) Cacher { + l, _ := lru.New(size) + return l +} + +var DefaultCache = NewCache(1024) diff --git a/pkg/util/listener/listener.go b/pkg/util/listener/listener.go new file mode 100644 index 00000000..065c5d29 --- /dev/null +++ b/pkg/util/listener/listener.go @@ -0,0 +1,50 @@ +package listener + +import ( + "fmt" + "net" + "sync" + + "github.com/fatedier/golib/errors" +) + +type Listener struct { + conns chan net.Conn + closed bool + mu sync.Mutex +} + +func NewProxyListener() *Listener { + return &Listener{ + conns: make(chan net.Conn, 64), + } +} + +func (l *Listener) Accept() (net.Conn, error) { + conn, ok := <-l.conns + if !ok { + return nil, fmt.Errorf("listener closed") + } + return conn, nil +} + +func (l *Listener) PutConn(conn net.Conn) error { + err := errors.PanicToError(func() { + l.conns <- conn + }) + return err +} + +func (l *Listener) Close() error { + l.mu.Lock() + defer l.mu.Unlock() + if !l.closed { + close(l.conns) + l.closed = true + } + return nil +} + +func (l *Listener) Addr() net.Addr { + return (*net.TCPAddr)(nil) +}