feat: http stream filter

This commit is contained in:
blizard863 2021-08-08 23:46:49 +08:00
parent 998e678a7f
commit 8428c2c01f
17 changed files with 543 additions and 64 deletions

View File

@ -51,6 +51,13 @@ func (svr *Service) RunAdminServer(address string) (err error) {
http.Redirect(w, r, "/static/", http.StatusMovedPermanently)
})
// http filter
router.HandleFunc("/api/http/list", svr.httpFilterList).Methods("GET")
router.HandleFunc("/api/http/raw_data/{id}", svr.httpFilterGetRaw).Methods("GET")
router.HandleFunc("/api/http/replay/{id}", svr.httpFilterReplay).Methods("PUT")
router.HandleFunc("/api/http/remove/{id}", svr.httpFilterRemove).Methods("DELETE")
router.HandleFunc("/api/http/clear", svr.httpFilterClear).Methods("PUT")
server := &http.Server{
Addr: address,
Handler: router,

87
client/http_filter_api.go Normal file
View File

@ -0,0 +1,87 @@
package client
import (
"bytes"
"encoding/json"
"io/ioutil"
"net/http"
"github.com/fatedier/frp/pkg/plugin/interceptor"
"github.com/fatedier/frp/pkg/util/cache"
"github.com/fatedier/frp/pkg/util/log"
"github.com/gorilla/mux"
)
func (svr *Service) httpFilterList(w http.ResponseWriter, r *http.Request) {
keys := cache.DefaultCache.Keys()
if len(keys) <= 0 {
log.Debug("there is no http stream be cached")
w.WriteHeader(200)
return
}
data, _ := json.Marshal(keys)
w.WriteHeader(200)
w.Write(data)
}
func (svr *Service) httpFilterGetRaw(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
dataRaw, ok := cache.DefaultCache.Get(id)
if !ok {
log.Warn("key not found: %v", id)
w.WriteHeader(404)
return
}
data, _ := json.Marshal(dataRaw)
w.WriteHeader(200)
w.Write(data)
}
func (svr *Service) httpFilterReplay(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
dataRaw, ok := cache.DefaultCache.Get(id)
if !ok {
log.Warn("key not found: %v", id)
w.WriteHeader(404)
return
}
pair, ok := dataRaw.(interceptor.Pair)
if !ok {
log.Warn("data type not match")
w.WriteHeader(500)
return
}
req, _ := http.NewRequest(pair.Req.Method, pair.Req.URL, bytes.NewBuffer(pair.Req.Body))
req.Header = pair.Req.Header
req.Host = pair.Req.Host
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Warn("replay request: %v got error: %v", pair.Req, err)
w.WriteHeader(500)
return
}
defer resp.Body.Close()
data, _ := ioutil.ReadAll(resp.Body)
log.Debug("replay get response body: %v", string(data))
w.WriteHeader(200)
}
func (svr *Service) httpFilterRemove(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
cache.DefaultCache.Remove(id)
w.WriteHeader(200)
}
func (svr *Service) httpFilterClear(w http.ResponseWriter, r *http.Request) {
cache.DefaultCache.Purge()
w.WriteHeader(200)
}

View File

@ -28,6 +28,7 @@ import (
"github.com/fatedier/frp/pkg/config"
"github.com/fatedier/frp/pkg/msg"
plugin "github.com/fatedier/frp/pkg/plugin/client"
"github.com/fatedier/frp/pkg/plugin/filter"
"github.com/fatedier/frp/pkg/proto/udp"
"github.com/fatedier/frp/pkg/util/limit"
frpNet "github.com/fatedier/frp/pkg/util/net"
@ -128,6 +129,8 @@ type TCPProxy struct {
cfg *config.TCPProxyConf
proxyPlugin plugin.Plugin
streamFilter *filter.HTTPStreamFilter
}
func (pxy *TCPProxy) Run() (err error) {
@ -137,6 +140,14 @@ func (pxy *TCPProxy) Run() (err error) {
return
}
}
if pxy.cfg.StreamFilterType != "" {
pxy.streamFilter, err = filter.NewHTTPStreamFilter(pxy.cfg.LocalSvrConf, pxy.cfg.StreamFilterParams)
if err != nil {
return
}
}
return
}
@ -148,7 +159,7 @@ func (pxy *TCPProxy) Close() {
func (pxy *TCPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter,
conn, []byte(pxy.clientCfg.Token), m)
conn, []byte(pxy.clientCfg.Token), m, pxy.streamFilter)
}
// TCP Multiplexer
@ -157,6 +168,8 @@ type TCPMuxProxy struct {
cfg *config.TCPMuxProxyConf
proxyPlugin plugin.Plugin
streamFilter *filter.HTTPStreamFilter
}
func (pxy *TCPMuxProxy) Run() (err error) {
@ -166,6 +179,14 @@ func (pxy *TCPMuxProxy) Run() (err error) {
return
}
}
if pxy.cfg.StreamFilterType != "" {
pxy.streamFilter, err = filter.NewHTTPStreamFilter(pxy.cfg.LocalSvrConf, pxy.cfg.StreamFilterParams)
if err != nil {
return
}
}
return
}
@ -177,7 +198,7 @@ func (pxy *TCPMuxProxy) Close() {
func (pxy *TCPMuxProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter,
conn, []byte(pxy.clientCfg.Token), m)
conn, []byte(pxy.clientCfg.Token), m, pxy.streamFilter)
}
// HTTP
@ -186,6 +207,8 @@ type HTTPProxy struct {
cfg *config.HTTPProxyConf
proxyPlugin plugin.Plugin
streamFilter *filter.HTTPStreamFilter
}
func (pxy *HTTPProxy) Run() (err error) {
@ -195,6 +218,14 @@ func (pxy *HTTPProxy) Run() (err error) {
return
}
}
if pxy.cfg.StreamFilterType != "" {
pxy.streamFilter, err = filter.NewHTTPStreamFilter(pxy.cfg.LocalSvrConf, pxy.cfg.StreamFilterParams)
if err != nil {
return
}
}
return
}
@ -206,7 +237,7 @@ func (pxy *HTTPProxy) Close() {
func (pxy *HTTPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter,
conn, []byte(pxy.clientCfg.Token), m)
conn, []byte(pxy.clientCfg.Token), m, pxy.streamFilter)
}
// HTTPS
@ -215,6 +246,8 @@ type HTTPSProxy struct {
cfg *config.HTTPSProxyConf
proxyPlugin plugin.Plugin
streamFilter *filter.HTTPStreamFilter
}
func (pxy *HTTPSProxy) Run() (err error) {
@ -224,6 +257,14 @@ func (pxy *HTTPSProxy) Run() (err error) {
return
}
}
if pxy.cfg.StreamFilterType != "" {
pxy.streamFilter, err = filter.NewHTTPStreamFilter(pxy.cfg.LocalSvrConf, pxy.cfg.StreamFilterParams)
if err != nil {
return
}
}
return
}
@ -235,7 +276,7 @@ func (pxy *HTTPSProxy) Close() {
func (pxy *HTTPSProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter,
conn, []byte(pxy.clientCfg.Token), m)
conn, []byte(pxy.clientCfg.Token), m, pxy.streamFilter)
}
// STCP
@ -244,6 +285,8 @@ type STCPProxy struct {
cfg *config.STCPProxyConf
proxyPlugin plugin.Plugin
streamFilter *filter.HTTPStreamFilter
}
func (pxy *STCPProxy) Run() (err error) {
@ -253,6 +296,14 @@ func (pxy *STCPProxy) Run() (err error) {
return
}
}
if pxy.cfg.StreamFilterType != "" {
pxy.streamFilter, err = filter.NewHTTPStreamFilter(pxy.cfg.LocalSvrConf, pxy.cfg.StreamFilterParams)
if err != nil {
return
}
}
return
}
@ -264,7 +315,7 @@ func (pxy *STCPProxy) Close() {
func (pxy *STCPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter,
conn, []byte(pxy.clientCfg.Token), m)
conn, []byte(pxy.clientCfg.Token), m, pxy.streamFilter)
}
// XTCP
@ -273,6 +324,8 @@ type XTCPProxy struct {
cfg *config.XTCPProxyConf
proxyPlugin plugin.Plugin
streamFilter *filter.HTTPStreamFilter
}
func (pxy *XTCPProxy) Run() (err error) {
@ -282,6 +335,14 @@ func (pxy *XTCPProxy) Run() (err error) {
return
}
}
if pxy.cfg.StreamFilterType != "" {
pxy.streamFilter, err = filter.NewHTTPStreamFilter(pxy.cfg.LocalSvrConf, pxy.cfg.StreamFilterParams)
if err != nil {
return
}
}
return
}
@ -414,7 +475,7 @@ func (pxy *XTCPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
}
HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter,
muxConn, []byte(pxy.cfg.Sk), m)
muxConn, []byte(pxy.cfg.Sk), m, pxy.streamFilter)
}
func (pxy *XTCPProxy) sendDetectMsg(addr string, port int, laddr *net.UDPAddr, content []byte) (err error) {
@ -722,7 +783,9 @@ func (pxy *SUDPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
// Common handler for tcp work connections.
func HandleTCPWorkConnection(ctx context.Context, localInfo *config.LocalSvrConf, proxyPlugin plugin.Plugin,
baseInfo *config.BaseProxyConf, limiter *rate.Limiter, workConn net.Conn, encKey []byte, m *msg.StartWorkConn) {
baseInfo *config.BaseProxyConf, limiter *rate.Limiter, workConn net.Conn, encKey []byte, m *msg.StartWorkConn,
streamFilter *filter.HTTPStreamFilter) {
xl := xlog.FromContextSafe(ctx)
var (
remote io.ReadWriteCloser
@ -790,6 +853,13 @@ func HandleTCPWorkConnection(ctx context.Context, localInfo *config.LocalSvrConf
return
}
if streamFilter != nil {
xl.Debug("handle by filter")
streamFilter.Handle(remote, workConn)
xl.Debug("handle by filter finish")
return
}
localConn, err := frpNet.ConnectServer("tcp", fmt.Sprintf("%s:%d", localInfo.LocalIP, localInfo.LocalPort))
if err != nil {
workConn.Close()

1
go.mod
View File

@ -12,6 +12,7 @@ require (
github.com/google/uuid v1.2.0
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.4.2
github.com/hashicorp/golang-lru v0.5.4
github.com/hashicorp/yamux v0.0.0-20210707203944-259a57b3608c
github.com/klauspost/cpuid v1.2.0 // indirect
github.com/klauspost/reedsolomon v1.9.1 // indirect

2
go.sum
View File

@ -205,6 +205,8 @@ github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b
github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=

View File

@ -73,6 +73,9 @@ type LocalSvrConf struct {
// PluginParams specify parameters to be passed to the plugin, if one is
// being used. By default, this value is an empty map.
PluginParams map[string]string `ini:"-"`
StreamFilterType string `ini:"stream_filter_type" json:"stream_filter_type"`
StreamFilterParams map[string]string `ini:"-"`
}
// HealthCheckConf configures health checking. This can be useful for load

View File

@ -23,6 +23,8 @@ import (
"net/http/httputil"
"strings"
"github.com/fatedier/frp/pkg/plugin/interceptor"
"github.com/fatedier/frp/pkg/util/listener"
frpNet "github.com/fatedier/frp/pkg/util/net"
)
@ -37,13 +39,15 @@ type HTTP2HTTPSPlugin struct {
localAddr string
headers map[string]string
l *Listener
l *listener.Listener
s *http.Server
}
func NewHTTP2HTTPSPlugin(params map[string]string) (Plugin, error) {
localAddr := params["plugin_local_addr"]
hostHeaderRewrite := params["plugin_host_header_rewrite"]
filter := params["stream_filter_type"]
headers := make(map[string]string)
for k, v := range params {
if !strings.HasPrefix(k, "plugin_header_") {
@ -58,7 +62,7 @@ func NewHTTP2HTTPSPlugin(params map[string]string) (Plugin, error) {
return nil, fmt.Errorf("plugin_local_addr is required")
}
listener := NewProxyListener()
listener := listener.NewProxyListener()
p := &HTTP2HTTPSPlugin{
localAddr: localAddr,
@ -67,8 +71,11 @@ func NewHTTP2HTTPSPlugin(params map[string]string) (Plugin, error) {
l: listener,
}
tr := &http.Transport{
tr := interceptor.NewTransportWrapper(&http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
})
if filter == "http" {
tr.WithCacheInterceptor()
}
rp := &httputil.ReverseProxy{

View File

@ -22,6 +22,8 @@ import (
"net/http"
"strings"
"github.com/fatedier/frp/pkg/plugin/interceptor"
"github.com/fatedier/frp/pkg/util/listener"
frpNet "github.com/fatedier/frp/pkg/util/net"
frpIo "github.com/fatedier/golib/io"
@ -35,21 +37,32 @@ func init() {
}
type HTTPProxy struct {
l *Listener
l *listener.Listener
s *http.Server
AuthUser string
AuthPasswd string
Transport http.RoundTripper
}
func NewHTTPProxyPlugin(params map[string]string) (Plugin, error) {
user := params["plugin_http_user"]
passwd := params["plugin_http_passwd"]
listener := NewProxyListener()
filter := params["stream_filter_type"]
listener := listener.NewProxyListener()
tr := interceptor.NewTransportWrapper(http.DefaultTransport)
if filter == "http" {
tr.WithCacheInterceptor()
}
hp := &HTTPProxy{
l: listener,
AuthUser: user,
AuthPasswd: passwd,
Transport: tr,
}
hp.s = &http.Server{
@ -115,7 +128,7 @@ func (hp *HTTPProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
func (hp *HTTPProxy) HTTPHandler(rw http.ResponseWriter, req *http.Request) {
removeProxyHeaders(req)
resp, err := http.DefaultTransport.RoundTrip(req)
resp, err := hp.Transport.RoundTrip(req)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return

View File

@ -23,6 +23,8 @@ import (
"net/http/httputil"
"strings"
"github.com/fatedier/frp/pkg/plugin/interceptor"
"github.com/fatedier/frp/pkg/util/listener"
frpNet "github.com/fatedier/frp/pkg/util/net"
)
@ -39,7 +41,7 @@ type HTTPS2HTTPPlugin struct {
localAddr string
headers map[string]string
l *Listener
l *listener.Listener
s *http.Server
}
@ -68,7 +70,9 @@ func NewHTTPS2HTTPPlugin(params map[string]string) (Plugin, error) {
return nil, fmt.Errorf("plugin_local_addr is required")
}
listener := NewProxyListener()
filter := params["stream_filter_type"]
listener := listener.NewProxyListener()
p := &HTTPS2HTTPPlugin{
crtPath: crtPath,
@ -79,6 +83,11 @@ func NewHTTPS2HTTPPlugin(params map[string]string) (Plugin, error) {
l: listener,
}
tr := interceptor.NewTransportWrapper(http.DefaultTransport)
if filter == "http" {
tr.WithCacheInterceptor()
}
rp := &httputil.ReverseProxy{
Director: func(req *http.Request) {
req.URL.Scheme = "http"
@ -90,6 +99,8 @@ func NewHTTPS2HTTPPlugin(params map[string]string) (Plugin, error) {
req.Header.Set(k, v)
}
},
Transport: tr,
}
p.s = &http.Server{

View File

@ -23,6 +23,8 @@ import (
"net/http/httputil"
"strings"
"github.com/fatedier/frp/pkg/plugin/interceptor"
"github.com/fatedier/frp/pkg/util/listener"
frpNet "github.com/fatedier/frp/pkg/util/net"
)
@ -39,7 +41,7 @@ type HTTPS2HTTPSPlugin struct {
localAddr string
headers map[string]string
l *Listener
l *listener.Listener
s *http.Server
}
@ -68,7 +70,9 @@ func NewHTTPS2HTTPSPlugin(params map[string]string) (Plugin, error) {
return nil, fmt.Errorf("plugin_local_addr is required")
}
listener := NewProxyListener()
filter := params["stream_filter_type"]
listener := listener.NewProxyListener()
p := &HTTPS2HTTPSPlugin{
crtPath: crtPath,
@ -79,8 +83,11 @@ func NewHTTPS2HTTPSPlugin(params map[string]string) (Plugin, error) {
l: listener,
}
tr := &http.Transport{
tr := interceptor.NewTransportWrapper(&http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
})
if filter == "http" {
tr.WithCacheInterceptor()
}
rp := &httputil.ReverseProxy{

View File

@ -18,9 +18,6 @@ import (
"fmt"
"io"
"net"
"sync"
"github.com/fatedier/golib/errors"
)
// Creators is used for create plugins to handle connections.
@ -49,44 +46,3 @@ type Plugin interface {
Handle(conn io.ReadWriteCloser, realConn net.Conn, extraBufToLocal []byte)
Close() error
}
type Listener struct {
conns chan net.Conn
closed bool
mu sync.Mutex
}
func NewProxyListener() *Listener {
return &Listener{
conns: make(chan net.Conn, 64),
}
}
func (l *Listener) Accept() (net.Conn, error) {
conn, ok := <-l.conns
if !ok {
return nil, fmt.Errorf("listener closed")
}
return conn, nil
}
func (l *Listener) PutConn(conn net.Conn) error {
err := errors.PanicToError(func() {
l.conns <- conn
})
return err
}
func (l *Listener) Close() error {
l.mu.Lock()
defer l.mu.Unlock()
if !l.closed {
close(l.conns)
l.closed = true
}
return nil
}
func (l *Listener) Addr() net.Addr {
return (*net.TCPAddr)(nil)
}

View File

@ -19,6 +19,7 @@ import (
"net"
"net/http"
"github.com/fatedier/frp/pkg/util/listener"
frpNet "github.com/fatedier/frp/pkg/util/net"
"github.com/gorilla/mux"
@ -36,7 +37,7 @@ type StaticFilePlugin struct {
httpUser string
httpPasswd string
l *Listener
l *listener.Listener
s *http.Server
}
@ -46,7 +47,7 @@ func NewStaticFilePlugin(params map[string]string) (Plugin, error) {
httpUser := params["plugin_http_user"]
httpPasswd := params["plugin_http_passwd"]
listener := NewProxyListener()
listener := listener.NewProxyListener()
sp := &StaticFilePlugin{
localPath: localPath,

View File

@ -0,0 +1,83 @@
package filter
import (
"context"
"crypto/tls"
"fmt"
"io"
"net"
"net/http"
"net/http/httputil"
"time"
"github.com/fatedier/frp/pkg/config"
"github.com/fatedier/frp/pkg/plugin/interceptor"
"github.com/fatedier/frp/pkg/util/listener"
frpNet "github.com/fatedier/frp/pkg/util/net"
)
const (
StreamFilterName = "StreamFilterName"
)
type HTTPStreamFilter struct {
l *listener.Listener
s *http.Server
}
func NewHTTPStreamFilter(filterConf config.LocalSvrConf, params map[string]string) (*HTTPStreamFilter, error) {
listener := listener.NewProxyListener()
p := &HTTPStreamFilter{
l: listener,
}
localAddr := fmt.Sprintf("%v:%v", filterConf.LocalIP, filterConf.LocalPort)
tr := interceptor.NewTransportWrapper(&http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
return net.Dial("tcp", localAddr)
},
ResponseHeaderTimeout: time.Duration(90) * time.Second,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
})
tr.WithCacheInterceptor()
rp := &httputil.ReverseProxy{
Director: func(req *http.Request) {
req.URL.Scheme = "http"
req.URL.Host = localAddr
req.Host = localAddr
},
Transport: tr,
}
p.s = &http.Server{
Handler: rp,
}
go p.s.Serve(listener)
return p, nil
}
func (p *HTTPStreamFilter) Handle(conn io.ReadWriteCloser, realConn net.Conn) {
wrapConn := frpNet.WrapReadWriteCloserToConn(conn, realConn)
p.l.PutConn(wrapConn)
}
func (p *HTTPStreamFilter) Name() string {
return StreamFilterName
}
func (p *HTTPStreamFilter) Close() error {
if err := p.s.Close(); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,106 @@
package interceptor
import (
"bytes"
"encoding/json"
"io/ioutil"
"net/http"
"time"
"github.com/fatedier/frp/pkg/util/cache"
"github.com/fatedier/frp/pkg/util/log"
"github.com/google/uuid"
)
var (
defaultCacheDataInterceptor = NewCacheDataInterceptor(cache.DefaultCache)
)
type CacheDataInterceptor struct {
c cache.Cacher
}
func NewCacheDataInterceptor(c cache.Cacher) *CacheDataInterceptor {
return &CacheDataInterceptor{
c: c,
}
}
func (cc *CacheDataInterceptor) Intercept(req *http.Request, resp *http.Response, elapseMs int) {
var (
bodyReq []byte
bodyResp []byte
)
uniqueId := uuid.NewString()
if req.Body != nil {
bodyReq, _ = ioutil.ReadAll(req.Body)
req.Body = ioutil.NopCloser(bytes.NewReader(bodyReq))
}
log.Debug("get request url: %v, path: %v, host: %v, header: %v, body: %v", req.URL.String(), req.URL.Path, req.Host, req.Header, string(bodyReq))
rawReq := RawRequest{
Method: req.Method,
URL: req.URL.String(),
Host: req.Host,
Header: req.Header,
Body: bodyReq,
ContentLength: req.ContentLength,
}
bodyResp, _ = ioutil.ReadAll(resp.Body)
resp.Body = ioutil.NopCloser(bytes.NewReader(bodyResp))
log.Debug("get response header: %v, body: %v, use: %v ms", resp.Header, string(bodyResp), elapseMs)
rawResp := RawResponse{
StatusCode: resp.StatusCode,
Header: resp.Header,
Body: bodyResp,
ContentLength: resp.ContentLength,
}
pair := Pair{
Key: uniqueId,
Req: rawReq,
Resp: rawResp,
TrafficCaptureTime: time.Now().Unix(),
ElapseMs: elapseMs,
}
pairByte, _ := json.Marshal(pair)
if len(pairByte) <= 1024*1024 {
// limit data size less than 1MB
cc.c.Add(uniqueId, pair)
}
}
type RawRequest struct {
Method string `json:"method,omitempty"`
URL string `json:"url,omitempty"`
Host string `json:"host,omitempty"`
Header http.Header `json:"header,omitempty"`
Body []byte `json:"body,omitempty"`
ContentLength int64 `json:"content_length,omitempty"`
}
type RawResponse struct {
StatusCode int `json:"status_code,omitempty"`
Header http.Header `json:"header,omitempty"`
Body []byte `json:"body,omitempty"`
ContentLength int64 `json:"content_length,omitempty"`
}
type Pair struct {
Key string `json:"key,omitempty"`
Req RawRequest `json:"req,omitempty"`
Resp RawResponse `json:"resp,omitempty"`
TrafficCaptureTime int64 `json:"traffic_capture_time,omitempty"`
ElapseMs int `json:"elapse_ms,omitempty"`
}

View File

@ -0,0 +1,49 @@
package interceptor
import (
"net/http"
"time"
)
type Interceptor func(req *http.Request, resp *http.Response, elapsMs int)
type TransportWrapper struct {
tr http.RoundTripper
interceptors []Interceptor
}
func NewTransportWrapper(tr http.RoundTripper, is ...Interceptor) *TransportWrapper {
tw := &TransportWrapper{
tr: tr,
}
tw.interceptors = append(tw.interceptors, is...)
return tw
}
func (tw *TransportWrapper) WithInterceptors(i ...Interceptor) {
tw.interceptors = append(tw.interceptors, i...)
}
func (tw *TransportWrapper) WithCacheInterceptor() {
tw.WithInterceptors(defaultCacheDataInterceptor.Intercept)
}
func (tw *TransportWrapper) RoundTrip(req *http.Request) (*http.Response, error) {
startTime := time.Now()
resp, err := tw.tr.RoundTrip(req)
if err != nil {
return resp, err
}
elapseMs := time.Since(startTime).Milliseconds()
go func() {
for _, op := range tw.interceptors {
op(req, resp, int(elapseMs))
}
}()
return resp, err
}

26
pkg/util/cache/cache.go vendored Normal file
View File

@ -0,0 +1,26 @@
package cache
import (
lru "github.com/hashicorp/golang-lru"
)
type Cacher interface {
Add(key, value interface{}) (evicted bool)
Contains(key interface{}) bool
Get(key interface{}) (value interface{}, ok bool)
Keys() []interface{}
Len() int
Purge()
Remove(key interface{}) bool
}
type lruCache struct {
*lru.Cache
}
func NewCache(size int) Cacher {
l, _ := lru.New(size)
return l
}
var DefaultCache = NewCache(1024)

View File

@ -0,0 +1,50 @@
package listener
import (
"fmt"
"net"
"sync"
"github.com/fatedier/golib/errors"
)
type Listener struct {
conns chan net.Conn
closed bool
mu sync.Mutex
}
func NewProxyListener() *Listener {
return &Listener{
conns: make(chan net.Conn, 64),
}
}
func (l *Listener) Accept() (net.Conn, error) {
conn, ok := <-l.conns
if !ok {
return nil, fmt.Errorf("listener closed")
}
return conn, nil
}
func (l *Listener) PutConn(conn net.Conn) error {
err := errors.PanicToError(func() {
l.conns <- conn
})
return err
}
func (l *Listener) Close() error {
l.mu.Lock()
defer l.mu.Unlock()
if !l.closed {
close(l.conns)
l.closed = true
}
return nil
}
func (l *Listener) Addr() net.Addr {
return (*net.TCPAddr)(nil)
}