diff --git a/config/config.go b/config/config.go index 599279d..d22972a 100644 --- a/config/config.go +++ b/config/config.go @@ -3,6 +3,7 @@ package config import ( "fmt" "github.com/spf13/viper" + "github.com/toxuin/alarmserver/servers/dahua" "github.com/toxuin/alarmserver/servers/hikvision" ) @@ -12,6 +13,7 @@ type Config struct { Webhooks WebhooksConfig `json:"webhooks"` Hisilicon HisiliconConfig `json:"hisilicon"` Hikvision HikvisionConfig `json:"hikvision"` + Dahua DahuaConfig `json:"dahua"` Ftp FtpConfig `json:"ftp"` } @@ -46,6 +48,11 @@ type HikvisionConfig struct { Cams []hikvision.HikCamera `json:"cams"` } +type DahuaConfig struct { + Enabled bool `json:"enabled"` + Cams []dahua.DhCamera `json:"cams"` +} + type FtpConfig struct { Enabled bool `json:"enabled"` Port int `json:"port"` @@ -69,6 +76,7 @@ func (c *Config) SetDefaults() { viper.SetDefault("hisilicon.enabled", true) viper.SetDefault("hisilicon.port", 15002) viper.SetDefault("hikvision.enabled", false) + viper.SetDefault("dahua.enabled", false) viper.SetDefault("ftp.enabled", false) viper.SetDefault("ftp.port", 21) viper.SetDefault("ftp.allowFiles", true) @@ -84,7 +92,9 @@ func (c *Config) SetDefaults() { _ = viper.BindEnv("hisilicon.enabled", "HISILICON_ENABLED") _ = viper.BindEnv("hisilicon.port", "HISILICON_PORT", "TCP_PORT") _ = viper.BindEnv("hikvision.enabled", "HIKVISION_ENABLED") - _ = viper.BindEnv("hikvision.cams", "HIKVISION_ENABLED") + _ = viper.BindEnv("hikvision.cams", "HIKVISION_CAMS") + _ = viper.BindEnv("dahua.enabled", "DAHUA_ENABLED") + _ = viper.BindEnv("dahua.cams", "DAHUA_CAMS") _ = viper.BindEnv("ftp.enabled", "FTP_ENABLED") _ = viper.BindEnv("ftp.port", "FTP_PORT") _ = viper.BindEnv("ftp.allowFiles", "FTP_ALLOW_FILES") @@ -114,6 +124,9 @@ func (c *Config) Load() *Config { Hikvision: HikvisionConfig{ Enabled: viper.GetBool("hikvision.enabled"), }, + Dahua: DahuaConfig{ + Enabled: viper.GetBool("dahua.enabled"), + }, } if viper.IsSet("mqtt") { @@ -134,6 +147,12 @@ func (c *Config) Load() *Config { panic(fmt.Errorf("unable to decode hisilicon config, %v", err)) } } + if viper.IsSet("dahua") { + err := viper.Sub("dahua").Unmarshal(&myConfig.Dahua) + if err != nil { + panic(fmt.Errorf("unable to decode dahua config, %v", err)) + } + } if viper.IsSet("ftp") { err := viper.Sub("ftp").Unmarshal(&myConfig.Ftp) if err != nil { @@ -145,7 +164,7 @@ func (c *Config) Load() *Config { panic("Both MQTT and Webhook buses are disabled. Nothing to do!") } - if !myConfig.Hisilicon.Enabled && !myConfig.Hikvision.Enabled && !myConfig.Ftp.Enabled { + if !myConfig.Hisilicon.Enabled && !myConfig.Hikvision.Enabled && !myConfig.Dahua.Enabled && !myConfig.Ftp.Enabled { panic("No Servers are enabled. Nothing to do!") } @@ -203,6 +222,8 @@ func (c *Config) Printout() { " port: %s\n"+ " SERVER: Hikvision - enabled: %t\n"+ " camera count: %d\n"+ + " Dahua server enabled: %t\n"+ + " cams: %v\n"+ " SERVER: FTP - enabled: %t\n"+ " port: %d\n"+ " files allowed: %t\n"+ @@ -220,6 +241,8 @@ func (c *Config) Printout() { c.Hisilicon.Port, c.Hikvision.Enabled, len(c.Hikvision.Cams), + c.Dahua.Enabled, + len(c.Dahua.Cams), c.Ftp.Enabled, c.Ftp.Port, c.Ftp.AllowFiles, diff --git a/docs/config.yaml b/docs/config.yaml index d752c48..bce6bc7 100644 --- a/docs/config.yaml +++ b/docs/config.yaml @@ -21,6 +21,15 @@ hisilicon: enabled: true port: 15002 +dahua: + enabled: true + cams: + myCam: + address: 192.168.1.13 + https: false + username: admin + password: admin1234 + ftp: enabled: true port: 21 diff --git a/main.go b/main.go index 4df24a7..7682998 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,7 @@ import ( "github.com/toxuin/alarmserver/buses/mqtt" "github.com/toxuin/alarmserver/buses/webhooks" conf "github.com/toxuin/alarmserver/config" + "github.com/toxuin/alarmserver/servers/dahua" "github.com/toxuin/alarmserver/servers/ftp" "github.com/toxuin/alarmserver/servers/hikvision" "github.com/toxuin/alarmserver/servers/hisilicon" @@ -83,6 +84,20 @@ func main() { } } + if config.Dahua.Enabled { + // START DAHUA SERVER + dhServer := dahua.Server{ + Debug: config.Debug, + WaitGroup: &processesWaitGroup, + Cameras: &config.Dahua.Cams, + MessageHandler: messageHandler, + } + dhServer.Start() + if config.Debug { + fmt.Println("STARTED DAHUA SERVER") + } + } + if config.Ftp.Enabled { // START FTP SERVER ftpServer := ftp.Server{ diff --git a/servers/dahua/server.go b/servers/dahua/server.go new file mode 100644 index 0000000..fcf5cdd --- /dev/null +++ b/servers/dahua/server.go @@ -0,0 +1,207 @@ +package dahua + +import ( + "fmt" + "io" + "io/ioutil" + "mime" + "mime/multipart" + "net/http" + "strconv" + "strings" + "sync" +) + +type DhCamera struct { + Debug bool + Name string `json:"name"` + Url string `json:"url"` + Username string `json:"username"` + Password string `json:"password"` + client *http.Client +} + +type Server struct { + Debug bool + WaitGroup *sync.WaitGroup + Cameras *[]DhCamera + MessageHandler func(topic string, data string) +} + +type DhEvent struct { + Camera *DhCamera + Type string + Message string +} + +type Event struct { + Code string + Action string + Index int + Data string + active bool +} + +func (camera *DhCamera) readEvents(channel chan<- DhEvent, callback func()) { + request, err := http.NewRequest("GET", camera.Url+"/cgi-bin/eventManager.cgi?action=attach&codes=All", nil) + if err != nil { + fmt.Printf("DAHUA: Error: Could not connect to camera %s\n", camera.Name) + fmt.Println("DAHUA: Error", err) + callback() + return + } + request.SetBasicAuth(camera.Username, camera.Password) + + response, err := camera.client.Do(request) + if err != nil { + fmt.Printf("DAHUA: Error opening HTTP connection to camera %s\n", camera.Name) + fmt.Println(err) + return + } + + if response.StatusCode != 200 { + fmt.Printf("DAHUA: Warning: Status Code was not 200, but %v\n", response.StatusCode) + } + + // FIGURE OUT MULTIPART BOUNDARY + mediaType, params, err := mime.ParseMediaType(response.Header.Get("Content-Type")) + if camera.Debug { + fmt.Printf("DAHUA: Media type is %s\n", mediaType) + } + + if params["boundary"] == "" { + fmt.Println("DAHUA: ERROR: Camera " + camera.Name + " does not seem to support event streaming") + callback() + return + } + multipartBoundary := params["boundary"] + + event := Event{} + + // READ PART BY PART + multipartReader := multipart.NewReader(response.Body, multipartBoundary) + for { + part, err := multipartReader.NextPart() + if err == io.EOF { + return + } + if err != nil { + fmt.Println(err) + continue + } + body, err := ioutil.ReadAll(part) + if err != nil { + fmt.Println(err) + continue + } + + if camera.Debug { + fmt.Printf("DAHUA: Read event body: %s\n", body) + } + + // EXAMPLE: "Code=VideoMotion; action=Start; index=0\r\n\r\n" + line := strings.Trim(string(body), " \n\r") + items := strings.Split(line, "; ") + keyValues := make(map[string]string, len(items)) + for _, item := range items { + parts := strings.Split(item, "=") + if len(parts) > 0 { + keyValues[parts[0]] = parts[1] + } + } + // EXAMPLE: { Code: VideoMotion, action: Start, index: 0 } + index := 0 + index, _ = strconv.Atoi(keyValues["index"]) + event.Code = keyValues["Code"] + event.Action = keyValues["action"] + event.Index = index + event.Data = keyValues["data"] + + switch event.Action { + case "Start": + if !event.active { + if camera.Debug { + fmt.Println("DAHUA: SENDING CAMERA EVENT!") + } + dahuaEvent := DhEvent{ + Camera: camera, + Type: event.Code, + Message: event.Data, + } + if dahuaEvent.Message == "" { + dahuaEvent.Message = event.Action + } + channel <- dahuaEvent + } + event.active = true + case "Stop": + event.active = false + } + } +} + +func (server *Server) addCamera(waitGroup *sync.WaitGroup, cam *DhCamera, channel chan<- DhEvent) { + waitGroup.Add(1) + + if server.Debug { + fmt.Printf("DAHUA: Adding camera %s: %s\n", cam.Name, cam.Url) + } + + if cam.client == nil { + cam.client = &http.Client{} + } + + go func() { + defer waitGroup.Done() + done := false + callback := func() { + done = true + } + + for { + if done { + break + } + go cam.readEvents(channel, callback) + } + fmt.Printf("DAHUA: Closed connection to camera %s\n", cam.Name) + }() +} + +func (server *Server) Start() { + if server.Cameras == nil || len(*server.Cameras) == 0 { + fmt.Println("DAHUA: Error: no cameras defined") + return + } + + if server.MessageHandler == nil { + fmt.Println("DAHUA: Message handler is not set for Dahua cams - that's probably not what you want") + server.MessageHandler = func(topic string, data string) { + fmt.Printf("DAHUA: Lost alarm: %s: %s\n", topic, data) + } + } + + waitGroup := sync.WaitGroup{} + eventChannel := make(chan DhEvent, 5) + + for _, camera := range *server.Cameras { + server.addCamera(&waitGroup, &camera, eventChannel) + } + + // START MESSAGE PROCESSOR + go func(waitGroup *sync.WaitGroup, channel <-chan DhEvent) { + // WAIT GROUP FOR INDIVIDUAL CAMERAS + defer waitGroup.Done() + + // EXTERNAL WAIT GROUP FOR PROCESSES + defer server.WaitGroup.Done() + server.WaitGroup.Add(1) + + for { + event := <-channel + go server.MessageHandler(event.Camera.Name+"/"+event.Type, event.Message) + } + }(&waitGroup, eventChannel) + + waitGroup.Wait() +}