Skip to content

Commit

Permalink
redis 分布式监控
Browse files Browse the repository at this point in the history
  • Loading branch information
LeeEirc committed May 8, 2021
1 parent a1b29dd commit 7b60f7d
Show file tree
Hide file tree
Showing 16 changed files with 907 additions and 82 deletions.
11 changes: 10 additions & 1 deletion config_example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,13 @@ BOOTSTRAP_TOKEN: <PleasgeChangeSameWithJumpserver>
# GUA_HOST: 127.0.0.1

# Guacamole Server 端口号,默认4822
# GUA_PORT: 4822
# GUA_PORT: 4822

# 会话共享使用的类型 [local, redis], 默认local
# SHARE_ROOM_TYPE: local

# Redis配置
# REDIS_HOST: 127.0.0.1
# REDIS_PORT: 6379
# REDIS_PASSWORD:
# REDIS_DB_ROOM:
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ require (
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f // indirect
github.com/gin-contrib/sessions v0.0.3
github.com/gin-gonic/gin v1.6.3
github.com/go-redis/redis/v8 v8.8.2
github.com/gofrs/uuid v4.0.0+incompatible
github.com/gorilla/websocket v1.4.2
github.com/satori/go.uuid v1.2.0 // indirect
github.com/shirou/gopsutil/v3 v3.21.3
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.7.1
github.com/stretchr/testify v1.7.0 // indirect
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0
Expand Down
72 changes: 72 additions & 0 deletions go.sum

Large diffs are not rendered by default.

28 changes: 22 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http/pprof"
"os"
"path/filepath"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -58,8 +59,8 @@ func main() {
jmsService := MustJMService()
bootstrap(jmsService)
tunnelService := tunnel.GuacamoleTunnelServer{
Cache: &tunnel.GuaTunnelCache{
Tunnels: make(map[string]*tunnel.Connection),
Cache: &tunnel.GuaTunnelCacheManager{
GuaTunnelCache: NewGuaTunnelCache(),
},
SessCache: &tunnel.SessionCache{
Sessions: make(map[string]*session.TunnelSession),
Expand All @@ -76,12 +77,26 @@ func main() {
logger.Fatal(http.ListenAndServe(addr, eng))
}

func runHeartTask(jmsService *service.JMService, tunnelCache *tunnel.GuaTunnelCache) {
func NewGuaTunnelCache() tunnel.GuaTunnelCache {
switch strings.ToLower(config.GlobalConfig.ShareRoomType) {
case config.ShareTypeRedis:
return tunnel.NewGuaTunnelRedisCache(tunnel.Config{
Addr: net.JoinHostPort(config.GlobalConfig.RedisHost,
strconv.Itoa(config.GlobalConfig.RedisPort)),
Password: config.GlobalConfig.RedisPassword,
DBIndex: config.GlobalConfig.RedisDBIndex,
})
default:
return tunnel.NewLocalTunnelLocalCache()
}
}

func runHeartTask(jmsService *service.JMService, tunnelCache *tunnel.GuaTunnelCacheManager) {
// default 30s
beatTicker := time.NewTicker(time.Second * 30)
defer beatTicker.Stop()
for range beatTicker.C {
sids := tunnelCache.Range()
sids := tunnelCache.RangeActiveSessionIds()
tasks, err := jmsService.TerminalHeartBeat(sids)
if err != nil {
logger.Error(err)
Expand All @@ -103,7 +118,7 @@ func runHeartTask(jmsService *service.JMService, tunnelCache *tunnel.GuaTunnelCa
}
}

func runCleanDriverDisk(tunnelCache *tunnel.GuaTunnelCache) {
func runCleanDriverDisk(tunnelCache *tunnel.GuaTunnelCacheManager) {
// default 1 hour
cleanDriveTicker := time.NewTicker(time.Hour)
defer cleanDriveTicker.Stop()
Expand All @@ -114,7 +129,7 @@ func runCleanDriverDisk(tunnelCache *tunnel.GuaTunnelCache) {
logger.Error(err)
continue
}
currentOnlineUserIds := tunnelCache.RangeUserIds()
currentOnlineUserIds := tunnelCache.RangeActiveUserIds()
for i := range folders {
if _, ok := currentOnlineUserIds[folders[i].Name()]; ok {
continue
Expand Down Expand Up @@ -234,6 +249,7 @@ func uploadRemainReplay(jmsService *service.JMService, remainFiles map[string]st
logger.Errorf("Upload replay failed: %s", err)
continue
}
logger.Infof("Upload remain session replay %s success", absGzPath)
// 上传成功删除文件
_ = os.Remove(absGzPath)
if err = jmsService.FinishReply(sid); err != nil {
Expand Down
6 changes: 6 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ type Config struct {
DisableAllUpDownload bool `mapstructure:"JUMPSERVER_DISABLE_ALL_UPLOAD_DOWNLOAD"`
EnableRemoteAppUpDownLoad bool `mapstructure:"JUMPSERVER_REMOTE_APP_UPLOAD_DOWNLOAD_ENABLE"`
EnableRemoteAPPCopyPaste bool `mapstructure:"JUMPSERVER_REMOTE_APP_COPY_PASTE_ENABLE"`

ShareRoomType string `mapstructure:"SHARE_ROOM_TYPE"`
RedisHost string `mapstructure:"REDIS_HOST"`
RedisPort int `mapstructure:"REDIS_PORT"`
RedisPassword string `mapstructure:"REDIS_PASSWORD"`
RedisDBIndex int `mapstructure:"REDIS_DB_ROOM"`
}

func Setup(configPath string) {
Expand Down
5 changes: 5 additions & 0 deletions pkg/config/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,8 @@ const (
GinSessionName = "session-Lion"
GinSessionKey = "SESSION"
)

const (
ShareTypeRedis = "redis"
ShareTypeLocal = "local"
)
8 changes: 7 additions & 1 deletion pkg/jms-sdk-go/service/jms.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ var AccessKeyUnauthorized = errors.New("access key unauthorized")

var ConnectErr = errors.New("api connect err")

const minTimeOut = time.Second * 30
const (
minTimeOut = time.Second * 30

orgHeaderKey = "X-JMS-ORG"
orgHeaderValue = "ROOT"
)

func NewAuthJMService(opts ...Option) (*JMService, error) {
opt := option{
Expand All @@ -35,6 +40,7 @@ func NewAuthJMService(opts ...Option) (*JMService, error) {
if opt.sign != nil {
httpClient.SetAuthSign(opt.sign)
}
httpClient.SetHeader(orgHeaderKey, orgHeaderValue)
return &JMService{authClient: httpClient}, nil
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/session/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"path/filepath"
"strings"
"time"

"github.com/gin-gonic/gin"
Expand Down Expand Up @@ -230,7 +231,9 @@ func (s *Server) RegisterFinishReplayCallback(tunnel TunnelSession) func() error
// 压缩完成则删除源文件
defer os.Remove(originReplayFilePath)
if replayStorage := storage.NewReplayStorage(replayConfig); replayStorage != nil {
err = replayStorage.Upload(dstReplayFilePath, tunnel.Created.Format(recordDirTimeFormat))
targetName := strings.Join([]string{tunnel.Created.Format(recordDirTimeFormat),
tunnel.ID + ReplayFileNameSuffix}, "/")
err = replayStorage.Upload(dstReplayFilePath, targetName)
} else {
err = s.JmsService.Upload(tunnel.ID, dstReplayFilePath)
}
Expand Down
68 changes: 20 additions & 48 deletions pkg/tunnel/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,62 +3,34 @@ package tunnel
import (
"sync"

"lion/pkg/guacd"
"lion/pkg/session"
)

type GuaTunnelCache struct {
sync.Mutex
Tunnels map[string]*Connection
}

func (g *GuaTunnelCache) Add(t *Connection) {
g.Lock()
defer g.Unlock()
g.Tunnels[t.guacdTunnel.UUID] = t
type Tunneler interface {
WriteAndFlush(p []byte) (int, error)
ReadInstruction() (guacd.Instruction, error)
Close() error
}

func (g *GuaTunnelCache) Delete(t *Connection) {
g.Lock()
defer g.Unlock()
delete(g.Tunnels, t.guacdTunnel.UUID)
type GuaTunnelCache interface {
Add(*Connection)
Delete(*Connection)
Get(string) *Connection
RangeActiveSessionIds() []string
RangeActiveUserIds() map[string]struct{}
GetBySessionId(sid string) *Connection
GetMonitorTunnelerBySessionId(sid string) Tunneler
RemoveMonitorTunneler(sid string, monitorTunnel Tunneler)
}

func (g *GuaTunnelCache) Get(tid string) *Connection {
g.Lock()
defer g.Unlock()
return g.Tunnels[tid]
}

func (g *GuaTunnelCache) Range() []string {
g.Lock()
ret := make([]string, 0, len(g.Tunnels))
for i := range g.Tunnels {
ret = append(ret, g.Tunnels[i].Sess.ID)
}
g.Unlock()
return ret
}

func (g *GuaTunnelCache) RangeUserIds() map[string]struct{} {
g.Lock()
ret := make(map[string]struct{})
for i := range g.Tunnels {
currentUser := g.Tunnels[i].Sess.User
ret[currentUser.ID] = struct{}{}
}
g.Unlock()
return ret
}
var (
_ GuaTunnelCache = (*GuaTunnelLocalCache)(nil)
_ GuaTunnelCache = (*GuaTunnelRedisCache)(nil)
)

func (g *GuaTunnelCache) GetBySessionId(sid string) *Connection {
g.Lock()
defer g.Unlock()
for i := range g.Tunnels {
if sid == g.Tunnels[i].Sess.ID {
return g.Tunnels[i]
}
}
return nil
type GuaTunnelCacheManager struct {
GuaTunnelCache
}

type SessionCache struct {
Expand Down
88 changes: 88 additions & 0 deletions pkg/tunnel/cache_local.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package tunnel

import (
"sync"

"lion/pkg/guacd"
"lion/pkg/logger"
)

func NewLocalTunnelLocalCache() *GuaTunnelLocalCache {
return &GuaTunnelLocalCache{
Tunnels: make(map[string]*Connection),
}
}

type GuaTunnelLocalCache struct {
sync.Mutex
Tunnels map[string]*Connection
}

func (g *GuaTunnelLocalCache) Add(t *Connection) {
g.Lock()
defer g.Unlock()
g.Tunnels[t.guacdTunnel.UUID] = t
}

func (g *GuaTunnelLocalCache) Delete(t *Connection) {
g.Lock()
defer g.Unlock()
delete(g.Tunnels, t.guacdTunnel.UUID)
}

func (g *GuaTunnelLocalCache) Get(tid string) *Connection {
g.Lock()
defer g.Unlock()
return g.Tunnels[tid]
}

func (g *GuaTunnelLocalCache) RangeActiveSessionIds() []string {
g.Lock()
ret := make([]string, 0, len(g.Tunnels))
for i := range g.Tunnels {
ret = append(ret, g.Tunnels[i].Sess.ID)
}
g.Unlock()
return ret
}

func (g *GuaTunnelLocalCache) RangeActiveUserIds() map[string]struct{} {
g.Lock()
ret := make(map[string]struct{})
for i := range g.Tunnels {
currentUser := g.Tunnels[i].Sess.User
ret[currentUser.ID] = struct{}{}
}
g.Unlock()
return ret
}

func (g *GuaTunnelLocalCache) GetBySessionId(sid string) *Connection {
g.Lock()
defer g.Unlock()
for i := range g.Tunnels {
if sid == g.Tunnels[i].Sess.ID {
return g.Tunnels[i]
}
}
return nil
}

func (g *GuaTunnelLocalCache) GetMonitorTunnelerBySessionId(sid string) Tunneler {
if conn := g.GetBySessionId(sid); conn != nil {
if guacdTunnel, err := conn.CloneMonitorTunnel(); err == nil {
return guacdTunnel
} else {
logger.Error(err)
}
}
return nil
}

func (g *GuaTunnelLocalCache) RemoveMonitorTunneler(sid string, monitorTunnel Tunneler) {
if conn := g.GetBySessionId(sid); conn != nil {
if tunnel, ok := monitorTunnel.(*guacd.Tunnel); ok {
conn.unTraceMonitorTunnel(tunnel)
}
}
}
Loading

0 comments on commit 7b60f7d

Please sign in to comment.