ETRI_SMARTFARM_POC/router/notification.go

152 lines
2.8 KiB
Go
Raw Normal View History

2021-12-13 05:39:16 +00:00
package router
import (
"etrismartfarmpoc/watcher"
"fmt"
"log"
"net/http"
"sync"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool { return true },
}
var notifications []chan *Notification
var notiMutex sync.Mutex
func sendNotification(noti *Notification) {
// notiMutex.Lock()
// defer notiMutex.Unlock()
for _, ch := range notifications {
ch <- noti
}
}
func removeNotification(noti chan *Notification) {
notiMutex.Lock()
defer notiMutex.Unlock()
for i, e := range notifications {
if e == noti {
2021-12-15 08:07:52 +00:00
notifications[i] = notifications[len(notifications)-1]
notifications = notifications[:len(notifications)-1]
2021-12-13 05:39:16 +00:00
}
}
}
func GetNotification(w http.ResponseWriter, r *http.Request) {
notiChan := make(chan *Notification, 1)
notiMutex.Lock()
notifications = append(notifications, notiChan)
notiMutex.Unlock()
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
for {
notification := <-notiChan
// fmt.Println("Write!!", discoveredDevices)
if conn.WriteJSON(notification) != nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
removeNotification(notiChan)
notiChan = nil
return
}
}
}
var watchers = map[string]*watcher.DeviceWatcher{}
func GetDeviceWatch(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
did, ok := vars["id"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("not found did"))
return
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
// payload := map[string]interface{}{}
// decoder := json.NewDecoder(r.Body)
// err = decoder.Decode(&payload)
// if err != nil {
// w.WriteHeader(http.StatusBadRequest)
// w.Write([]byte(err.Error()))
// return
// }
if !checkDid(did) {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("not exist did"))
return
}
s := make(chan map[string]interface{})
ec := make(chan error)
defer close(s)
watch, ok := watchers[did]
if !ok {
watch = &watcher.DeviceWatcher{}
watchers[did] = watch
}
watch.Subscribe(s)
defer func() {
cnt := watch.Desubscribe(s)
if cnt == 0 {
delete(watchers, did)
log.Println("delete watchers: ", watchers)
}
}()
go func() {
_, _, err := conn.ReadMessage()
if err != nil {
ec <- err
}
}()
for {
select {
case param := <-s:
if conn.WriteJSON(param) != nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
return
}
case <-r.Context().Done():
fmt.Println("see you later~")
w.WriteHeader(http.StatusOK)
w.Write([]byte("See you later~"))
return
case <-ec:
return
}
}
}
func checkDid(did string) bool {
return true
}