Skip to content

Commit

Permalink
feat: 1.新增复杂监控特性 2.通过http端口获取监控信息 3.新增http的pprof以方便调试 (#15)
Browse files Browse the repository at this point in the history
* feat: 1.新增复杂监控特性 2.通过http端口获取监控信息 3.新增http的pprof以方便调试

* docs & test & fix: 新增监控性能测试,修复taskExecer重试的bug,修改文档说明
  • Loading branch information
Breeze0806 authored Jun 20, 2023
1 parent aa10bf3 commit 97dd3f2
Show file tree
Hide file tree
Showing 26 changed files with 678 additions and 315 deletions.
37 changes: 8 additions & 29 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ go-etl将提供的etl能力如下:
| 无结构流 | CSV ||| [](datax/plugin/reader/csv/README.md)[](datax/plugin/writer/csv/README.md) |
| | XLSX(excel) ||| [](datax/plugin/reader/xlsx/README.md)[](datax/plugin/writer/xlsx/README.md) |

### 用户手册
### 数据同步用户手册

使用[go-etl用户手册](README_USER.md)开始数据同步
使用[go-etl数据同步用户手册](README_USER.md)开始数据同步

### 数据同步开发宝典

Expand All @@ -46,14 +46,6 @@ go-etl将提供的etl能力如下:
### datax

本包将提供类似于阿里巴巴[DataX](https://github.com/alibaba/DataX)的接口去实现go的etl框架,目前主要实现了job框架内的数据同步能力.

#### plan

- [x] 实现关系型数据库的任务切分
- [x] 实现监控模块
- [x] 实现流控模块
- [ ] 实现关系型数据库入库断点续传

### storage

#### database
Expand All @@ -65,28 +57,11 @@ go-etl将提供的etl能力如下:
主要用于字节流的解析,如文件,消息队列,elasticsearch等,字节流格式可以是cvs,json, xml等

#### file

#### mq

##### plan

暂无时间安排计划,欢迎来实现

#### elasticsearch

##### plan

暂无时间安排计划,欢迎来实现
主要用于文件的解析,如cvs,excel等

### transform

主要用于类sql数据转化

#### plan

- [ ] 引入tidb数据库的mysql解析能力
- [ ] 引入tidb数据库的mysql函数计算能力
- [ ] 运用mysql解析能力和mysql函数计算能力实现数据转化能力
主要用于类sql数据转化,类似百度引擎

### tools

Expand All @@ -107,6 +82,10 @@ go generate ./...

数据源插件模板新增工具,用于新增一个reader或writer模板,配合发布命令使用,减少开发者负担

##### plugin

数据源插件打包工具

#### license

用于自动新增go代码文件中许可证
Expand Down
43 changes: 24 additions & 19 deletions README_USER.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# go-etl用户手册
# go-etl数据同步用户手册

go-etl的datax是一个数据同步工具,目前支持MySQL,postgres,oracle,SQL SERVER,DB2等主流关系型数据库以及csv,xlsx文件之间的数据同步。

Expand Down Expand Up @@ -104,23 +104,24 @@ data -c config.json
}
}
```
#### 流控配置

之前speed的byte和record配置并不会生效,现在加入流控特性后,byte和record将会生效,byte会限制缓存消息字节数,而record会限制缓存消息条数,如果byte设置过小会导致缓存过小而导致同步数据失败。当byte为0或负数时,限制器将不会工作,例如byte为10485760,现在为10Mb(10*1024*1024)。
```json
{
"job":{
"setting":{
"speed":{
"byte":,
"record":10485760,
"channel":4
}
}
}
}
`reader``writer`的配置如下:

```
| 类型 | 数据源 | Reader(读) | Writer(写) | 文档 |
| ------------ | ------------------ | ------------ | ---------- | ------------------------------------------------------------ |
| 关系型数据库 | MySQL/Mariadb/Tidb ||| [](datax/plugin/reader/mysql/README.md)[](datax/plugin/writer/mysql/README.md) |
| | Postgres/Greenplum ||| [](datax/plugin/reader/postgres/README.md)[](datax/plugin/writer/postgres/README.md) |
| | DB2 LUW ||| [](datax/plugin/reader/db2/README.md)[](datax/plugin/writer/db2/README.md) |
| | SQL Server ||| [](datax/plugin/reader/sqlserver/README.md)[](datax/plugin/writer/sqlserver/README.md) |
| | Oracle ||| [](datax/plugin/reader/oracle/README.md)[](datax/plugin/writer/oracle/README.md) |
| 无结构流 | CSV ||| [](datax/plugin/reader/csv/README.md)[](datax/plugin/writer/csv/README.md) |
| | XLSX(excel) ||| [](datax/plugin/reader/xlsx/README.md)[](datax/plugin/writer/xlsx/README.md) |

#### 2.1.2 使用示例

注意在linux下如Makefile所示export LD_LIBRARY_PATH=${DB2HOME}/lib

##### 2.1.2.1 使用mysql同步

`reader``writer`的配置如下:

Expand Down Expand Up @@ -446,13 +447,17 @@ datax -h
帮助显示

```bash
Usage of datax:
-c string #数据源配置文件
Usage of datax:
-c string
config (default "config.json")
-w string #源目的配置向导文件
-http string
http
-w string
wizard
```

-http 新增监听端口,如:8080, 开启后访问127.0.0.1:8080/metrics获取实时的吞吐量

#### 2.3.2 查看版本

```
Expand Down
75 changes: 72 additions & 3 deletions cmd/datax/enveronment.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@ package main

import (
"context"
"fmt"
"io/ioutil"
"net/http"
_ "net/http/pprof"
"time"

"github.com/Breeze0806/go-etl/config"
"github.com/Breeze0806/go-etl/datax"
"github.com/gorilla/handlers"
)

type enveronment struct {
Expand All @@ -28,9 +33,11 @@ type enveronment struct {
err error
ctx context.Context
cancel context.CancelFunc
server *http.Server
addr string
}

func newEnveronment(filename string) (e *enveronment) {
func newEnveronment(filename string, addr string) (e *enveronment) {
e = &enveronment{}
var buf []byte
buf, e.err = ioutil.ReadFile(filename)
Expand All @@ -42,20 +49,82 @@ func newEnveronment(filename string) (e *enveronment) {
return e
}
e.ctx, e.cancel = context.WithCancel(context.Background())
e.addr = addr
return e
}

func (e *enveronment) build() error {
return e.initEngine().initServer().startEngine().err
}

func (e *enveronment) initEngine() *enveronment {
if e.err != nil {
return e.err
return e
}
e.engine = datax.NewEngine(e.ctx, e.config)

return e
}

func (e *enveronment) initServer() *enveronment {
if e.err != nil {
return e
}
if e.addr != "" {
r := http.NewServeMux()
recoverHandler := handlers.RecoveryHandler(handlers.PrintRecoveryStack(true))
r.Handle("/metrics", recoverHandler(newHandler(e.engine)))
e.server = &http.Server{
Addr: e.addr,
Handler: handlers.CompressHandler(r),
}
go func() {
log.Debugf("listen begin: %v", e.addr)
defer log.Debugf("listen end: %v", e.addr)
if err := e.server.ListenAndServe(); err != nil {
log.Errorf("ListenAndServe fail. addr: %v err: %v", e.addr, err)
}
log.Infof("ListenAndServe success. addr: %v", e.addr)
}()
}

return e
}

func (e *enveronment) startEngine() *enveronment {
if e.err != nil {
return e
}
go func() {
statsTimer := time.NewTicker(5 * time.Second)
defer statsTimer.Stop()
exit := false
for {
select {
case <-statsTimer.C:
case <-e.ctx.Done():
exit = true
default:
}
if e.engine.Container != nil {
fmt.Printf("%v\r", e.engine.Metrics().JSON())
}

if exit {
return
}
}
}()
e.err = e.engine.Start()
return e.err

return e
}

func (e *enveronment) close() {
if e.server != nil {
e.server.Shutdown(e.ctx)
}

if e.cancel != nil {
e.cancel()
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/datax/examples/limit/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"container": {
"job":{
"id": 1,
"sleepInterval":100
"sleepInterval":1000
}
}
},
Expand Down Expand Up @@ -31,7 +31,7 @@
],
"setting":{
"speed":{
"byte":10485760,
"byte":204800,
"record":1024,
"channel":4
}
Expand Down
40 changes: 0 additions & 40 deletions cmd/datax/examples/limit/csv.json

This file was deleted.

63 changes: 0 additions & 63 deletions cmd/datax/examples/split/mysql.json

This file was deleted.

Loading

0 comments on commit 97dd3f2

Please sign in to comment.