diff --git a/containers/devicemanagerb/constants/constants.go b/containers/devicemanagerb/constants/constants.go new file mode 100644 index 0000000..d87249c --- /dev/null +++ b/containers/devicemanagerb/constants/constants.go @@ -0,0 +1,32 @@ +package constants + +import ( + "fmt" + "net" + "os" + "strings" +) + +func getIP() string { + host, _ := os.Hostname() + addrs, _ := net.LookupIP(host) + + return addrs[0].String() +} + +var ServerAddr string +var MyIP string + +func init() { + var exist bool + ServerAddr, exist = os.LookupEnv("SERVER_ADDR") + if !exist { + fmt.Println("Please set SERVER_ADDR as environment variable") + } + + MyIP = getIP() + idx := strings.LastIndex(MyIP, ".") + ServerAddr = MyIP[:idx+1] + "1" + fmt.Println(ServerAddr) + +} diff --git a/containers/devicemanagerb/go.mod b/containers/devicemanagerb/go.mod index ae71788..1099b6f 100644 --- a/containers/devicemanagerb/go.mod +++ b/containers/devicemanagerb/go.mod @@ -1,3 +1,8 @@ module devicemanagerb go 1.17 + +require ( + github.com/gorilla/mux v1.7.4 + github.com/urfave/negroni v1.0.0 +) diff --git a/containers/devicemanagerb/go.sum b/containers/devicemanagerb/go.sum index e69de29..1947147 100644 --- a/containers/devicemanagerb/go.sum +++ b/containers/devicemanagerb/go.sum @@ -0,0 +1,4 @@ +github.com/gorilla/mux v1.7.4 h1:VuZ8uybHlWmqV03+zRzdwKL4tUnIp1MAQtp1mIFE1bc= +github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/urfave/negroni v1.0.0 h1:kIimOitoypq34K7TG7DUaJ9kq/N4Ofuwi1sjz0KipXc= +github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4= diff --git a/containers/devicemanagerb/main.go b/containers/devicemanagerb/main.go index 9df4de8..7c6ac3e 100644 --- a/containers/devicemanagerb/main.go +++ b/containers/devicemanagerb/main.go @@ -2,46 +2,26 @@ package main import ( "bytes" + "devicemanagerb/constants" "devicemanagerb/router" "encoding/json" "fmt" "io/ioutil" - "net" "net/http" - "os" - "strings" ) -var server_addr string - -func getIP() string { - host, _ := os.Hostname() - addrs, _ := net.LookupIP(host) - - return addrs[0].String() -} func registerToServer() { - var exist bool - server_addr, exist = os.LookupEnv("SERVER_ADDR") - if !exist { - fmt.Println("Please set SERVER_ADDR as environment variable") - } - - ip := getIP() - idx := strings.LastIndex(ip, ".") - serverAddr := ip[:idx+1] + "1" - fmt.Println(serverAddr) var obj map[string]string = make(map[string]string) obj["name"] = "devicemanagerb" - obj["addr"] = getIP() + ":3000" + obj["addr"] = constants.MyIP + ":3000" b, err := json.Marshal(obj) if err != nil { panic(err) } - req, err := http.NewRequest("PUT", "http://"+serverAddr+":3000/services", bytes.NewBuffer(b)) + req, err := http.NewRequest("PUT", "http://"+constants.ServerAddr+":3000/services", bytes.NewBuffer(b)) if err != nil { panic(err) } @@ -56,7 +36,6 @@ func registerToServer() { panic(err) } fmt.Println(string(b)) - // http.Post("") } func main() { diff --git a/containers/devicemanagerb/router/route.go b/containers/devicemanagerb/router/route.go index f1728f5..df0c076 100644 --- a/containers/devicemanagerb/router/route.go +++ b/containers/devicemanagerb/router/route.go @@ -1,15 +1,150 @@ package router import ( + "bytes" + "devicemanagerb/constants" + "encoding/json" + "io/ioutil" + "log" "net/http" + + "github.com/gorilla/mux" + "github.com/urfave/negroni" ) func NewRouter() http.Handler { - mux := http.NewServeMux() - mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - w.Write([]byte("I am devicemanagerB")) - }) + mux := mux.NewRouter() + // mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + // w.WriteHeader(http.StatusOK) + // w.Write([]byte("I am devicemanagerB")) + // }) - return mux + mux.HandleFunc("/{id}", PutStatusChangedHandle).Methods("PUT") + mux.HandleFunc("/{id}", PostStatusChangedHandle).Methods("POST") + mux.HandleFunc("/{id}", GetStatusHandle).Methods("GET") + + n := negroni.Classic() + n.UseHandler(mux) + return n } + +// sensing data per device +var s_data = map[string]interface{}{} + +// status from sensor +func PutStatusChangedHandle(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + + b, err := ioutil.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(err.Error())) + return + } + + status := map[string]interface{}{} + err = json.Unmarshal(b, &status) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(err.Error())) + return + } + + did, ok := vars["id"] + if !ok { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(err.Error())) + return + } + + s_data[did] = status + log.Println(s_data) + + cdata, ok := c_data[did] + if !ok { + w.Write([]byte("I am devicemanagerB")) + } else { + encoder := json.NewEncoder(w) + err := encoder.Encode(cdata) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(err.Error())) + return + } + } + + req, err := http.NewRequest("PUT", "http://"+constants.ServerAddr+":3000/device/"+did, bytes.NewReader(b)) + if err != nil { + return + } + + _, err = http.DefaultClient.Do(req) + if err != nil { + panic(err) + } + +} + +var c_data = map[string]interface{}{} + +func GetStatusHandle(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + did, ok := vars["id"] + if !ok { + w.WriteHeader(http.StatusBadRequest) + return + } + + status, ok := c_data[did] + if !ok { + status = map[string]interface{}{ + "servo": 0, + "fan": 0, + "light": 0, + } + } + + encoder := json.NewEncoder(w) + + err := encoder.Encode(status) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(err.Error())) + return + } + + w.WriteHeader(http.StatusOK) +} + +func PostStatusChangedHandle(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + + b, err := ioutil.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(err.Error())) + return + } + + status := map[string]interface{}{} + err = json.Unmarshal(b, &status) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(err.Error())) + return + } + + did, ok := vars["id"] + if !ok { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(err.Error())) + return + } + + c_data[did] = status + log.Println(c_data) + + w.WriteHeader(http.StatusOK) +} + +// status from user per device diff --git a/init.sh b/init.sh index 3fe52f0..8327a30 100644 --- a/init.sh +++ b/init.sh @@ -1 +1,3 @@ -rm -r *.db *.pcap \ No newline at end of file +rm -r *.db +docker container stop $(docker container ls -q) +docker container prune \ No newline at end of file diff --git a/model/controller.go b/model/controller.go index 6830092..43f628c 100644 --- a/model/controller.go +++ b/model/controller.go @@ -55,10 +55,5 @@ func (s *dbHandler) IsExistController(cid string) bool { result := s.db.First(&controller, "cid=?", cid) - if result.Error != nil { - fmt.Println(result.Error) - return false - } - - return true + return result.Error == nil } diff --git a/model/device.go b/model/device.go index 552551b..b7eb18e 100644 --- a/model/device.go +++ b/model/device.go @@ -41,3 +41,33 @@ func (s *dbHandler) AddDevice(device *Device) error { return nil } + +func (s *dbHandler) QueryDevice(dname string) (*Device, error) { + var device Device + tx := s.db.First(&device, "dname=?", dname) + + if tx.Error != nil { + return nil, tx.Error + } + + return &device, nil +} +func (s *dbHandler) DeleteDevice(device *Device) error { + + tx := s.db.Delete(device) + if tx.Error != nil { + return tx.Error + } + + tx.First(device, "did=?", device.DID) + return nil + +} + +func (s *dbHandler) IsExistDevice(dname string) bool { + var device = Device{} + + result := s.db.First(&device, "dname=?", dname) + + return result.Error != nil +} diff --git a/model/interface.go b/model/interface.go index 3eb01de..ac1c777 100644 --- a/model/interface.go +++ b/model/interface.go @@ -16,6 +16,9 @@ import ( type DBHandler interface { GetDevices() ([]*Device, int, error) AddDevice(d *Device) error + QueryDevice(dname string) (*Device, error) + DeleteDevice(device *Device) error + IsExistDevice(dname string) bool AddController(r io.Reader) (*Controller, error) GetControllers() ([]*Controller, error) IsExistController(cid string) bool diff --git a/model/service.go b/model/service.go index 1dca6b6..a155f9b 100644 --- a/model/service.go +++ b/model/service.go @@ -1,6 +1,8 @@ package model import ( + "errors" + "github.com/google/uuid" "gorm.io/gorm" ) @@ -77,6 +79,9 @@ func (s *dbHandler) GetSID(name string) (string, error) { return "", tx.Error } + if len(service.SID) == 0 { + return "", errors.New("not installed service") + } return service.SID, nil } diff --git a/public-ws/index.html b/public-ws/index.html new file mode 100644 index 0000000..54131ab --- /dev/null +++ b/public-ws/index.html @@ -0,0 +1,14 @@ + + + + + + Web socket sample + + + + +
hello
+ + + \ No newline at end of file diff --git a/public-ws/index.js b/public-ws/index.js new file mode 100644 index 0000000..2b1e1d2 --- /dev/null +++ b/public-ws/index.js @@ -0,0 +1,25 @@ +window.onload = () => { + if (!window.WebSocket) { + alert("No WebSocket!"); + return; + } + function connect() { + let ws = new WebSocket(`ws://${window.location.host}/device/73531420-ae8c-47be-9eff-308d946f3a65`); + ws.onopen = (e) => { + console.log("onopen", arguments); + } + + ws.onclose = () => { + console.log("onclose", arguments); + } + + ws.onmessage = function (e) { + console.log(e.data); + // addMessage(JSON.parse(e.data)); + console.log(JSON.parse(e.data)); + } + return ws; + } + + ws = connect(); +} \ No newline at end of file diff --git a/router/controller.go b/router/controller.go new file mode 100644 index 0000000..eae3a74 --- /dev/null +++ b/router/controller.go @@ -0,0 +1,50 @@ +package router + +import ( + "encoding/json" + "fmt" + "net/http" +) + +func GetControllerList(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE") + list, err := db.GetControllers() + if err != nil { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(err.Error())) + return + } + fmt.Println(list) + encoder := json.NewEncoder(w) + + err = encoder.Encode(list) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(err.Error())) + return + } +} + +func PostController(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE") + + controller, err := db.AddController(r.Body) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(err.Error())) + return + } + w.WriteHeader(http.StatusCreated) + encoder := json.NewEncoder(w) + err = encoder.Encode(controller) + + if err != nil { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(err.Error())) + return + } + + sendNotification(&Notification{Msg: "Added Controller"}) +} diff --git a/router/devices.go b/router/devices.go new file mode 100644 index 0000000..3c03401 --- /dev/null +++ b/router/devices.go @@ -0,0 +1,240 @@ +package router + +import ( + "encoding/json" + "etrismartfarmpoc/model" + "etrismartfarmpoc/watcher" + "fmt" + "net/http" + "sync" + "time" + + "github.com/google/uuid" + "github.com/gorilla/mux" +) + +var waitPermission = map[string]chan bool{} +var mutex sync.Mutex +var discoveredDevices = []*model.Device{} + +func GetDiscoveredDevices(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE") + + encoder := json.NewEncoder(w) + encoder.Encode(discoveredDevices) +} + +func removeDevice(device *model.Device) { + for i, e := range discoveredDevices { + if e == device { + discoveredDevices[i] = discoveredDevices[len(discoveredDevices)-1] + discoveredDevices = discoveredDevices[:len(discoveredDevices)-1] + } + } +} + +func GetDevices(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE") + + devices, _, err := db.GetDevices() + + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + encoder := json.NewEncoder(w) + encoder.Encode(devices) +} + +// PostDevice : Handle for Receiving discovery message +func PostDevice(w http.ResponseWriter, r *http.Request) { + var device = &model.Device{} + decoder := json.NewDecoder(r.Body) + err := decoder.Decode(device) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + // 등록된 제어기로 부터 전송된 요청 메시지임을 확인 + if !db.IsExistController(device.CID) { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("Wrong Controller ID")) + return + } + + if !db.IsExistDevice(device.DName) { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("Already exist device")) + return + } + + // 장치 ID 생성 및 탐색된 장치 추가 + device.DID = uuid.NewString() + mutex.Lock() + waitPermission[device.DID] = make(chan bool) + discoveredDevices = append(discoveredDevices, device) + + // 관리자에게 탐색을 알림 + sendNotification(&Notification{Msg: "Add discovered device"}) + mutex.Unlock() + + timer := time.NewTimer(20 * time.Second) + + select { + case <-r.Context().Done(): + fmt.Println("^^") + mutex.Lock() + defer mutex.Unlock() + delete(waitPermission, device.DID) + removeDevice(device) + sendNotification(&Notification{Msg: "Remove discovered device"}) + w.WriteHeader(http.StatusOK) + w.Write([]byte("This operation is not permitted")) + return + case <-timer.C: + mutex.Lock() + defer mutex.Unlock() + delete(waitPermission, device.DID) + removeDevice(device) + sendNotification(&Notification{Msg: "Remove discovered device"}) + w.WriteHeader(http.StatusOK) + w.Write([]byte("This operation is not permitted")) + return + case b := <-waitPermission[device.DID]: + if b { + // 디바이스 등록 절차 수행 + db.AddDevice(device) + db.AddService(device.SName) + + // 디바이스 등록 알림 + w.WriteHeader(http.StatusCreated) + json.NewEncoder(w).Encode(device) + sendNotification(&Notification{Msg: "Added Device"}) + mutex.Lock() + defer mutex.Unlock() + delete(waitPermission, device.DID) + removeDevice(device) + sendNotification(&Notification{Msg: "Add device"}) + } else { + mutex.Lock() + defer mutex.Unlock() + delete(waitPermission, device.DID) + removeDevice(device) + sendNotification(&Notification{Msg: "Remove discovered device"}) + w.WriteHeader(http.StatusOK) + w.Write([]byte("This operation is not permitted")) + } + } +} + +func DeleteDevice(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE") + msg := map[string]string{} + decoder := json.NewDecoder(r.Body) + err := decoder.Decode(&msg) + + device, err := db.QueryDevice(msg["dname"]) + db.DeleteDevice(device) + + if err != nil { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(err.Error())) + } + + sendNotification(&Notification{Msg: "Delete device"}) + w.WriteHeader(http.StatusOK) +} + +func PutDevice(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE") + msg := map[string]string{} + decoder := json.NewDecoder(r.Body) + err := decoder.Decode(&msg) + + fmt.Println("msg[did]: ", msg["did"]) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(err.Error())) + } + + ch, ok := waitPermission[msg["did"]] + if !ok { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("Wrong Device ID is Sended")) + return + } + ch <- true + w.WriteHeader(http.StatusOK) +} + +// 값 변경 알림 +func PutDeviceStatus(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + did, ok := vars["id"] + if !ok { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("not found did")) + return + } + + if r.ContentLength == 0 { + w.WriteHeader(http.StatusOK) + return + } + + watch, ok := watchers[did] + if !ok { + w.WriteHeader(http.StatusOK) + return + } + + param := map[string]interface{}{} + decoder := json.NewDecoder(r.Body) + + err := decoder.Decode(¶m) + if err != nil { + w.WriteHeader(http.StatusOK) + return + } + + watch.Notify(watcher.NewStateChangedEvent(nil, param)) + w.WriteHeader(http.StatusOK) +} + +// func PostDeviceStatus(w http.ResponseWriter, r *http.Request) { +// vars := mux.Vars(r) +// did, ok := vars["id"] +// if !ok { +// w.WriteHeader(http.StatusBadRequest) +// w.Write([]byte("not found did")) +// return +// } + +// if r.ContentLength == 0 { +// w.WriteHeader(http.StatusOK) +// return +// } + +// watch, ok := watchers[did] +// if !ok { +// w.WriteHeader(http.StatusOK) +// return +// } + +// param := map[string]interface{}{} +// decoder := json.NewDecoder(r.Body) + +// err := decoder.Decode(¶m) +// if err != nil { +// w.WriteHeader(http.StatusOK) +// return +// } + +// watch.Notify(watcher.NewStateChangedEvent(nil, param)) +// } diff --git a/router/notification.go b/router/notification.go new file mode 100644 index 0000000..678ee99 --- /dev/null +++ b/router/notification.go @@ -0,0 +1,151 @@ +package router + +import ( + "etrismartfarmpoc/watcher" + "fmt" + "log" + "net/http" + "sync" + + "github.com/gorilla/mux" + "github.com/gorilla/websocket" +) + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { return true }, +} + +var notifications []chan *Notification +var notiMutex sync.Mutex + +func sendNotification(noti *Notification) { + // notiMutex.Lock() + // defer notiMutex.Unlock() + for _, ch := range notifications { + ch <- noti + } +} + +func removeNotification(noti chan *Notification) { + notiMutex.Lock() + defer notiMutex.Unlock() + for i, e := range notifications { + if e == noti { + discoveredDevices[i] = discoveredDevices[len(discoveredDevices)-1] + discoveredDevices = discoveredDevices[:len(discoveredDevices)-1] + } + } +} + +func GetNotification(w http.ResponseWriter, r *http.Request) { + notiChan := make(chan *Notification, 1) + notiMutex.Lock() + notifications = append(notifications, notiChan) + notiMutex.Unlock() + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(err.Error())) + return + } + + for { + notification := <-notiChan + // fmt.Println("Write!!", discoveredDevices) + if conn.WriteJSON(notification) != nil { + log.Println(err) + w.WriteHeader(http.StatusBadRequest) + removeNotification(notiChan) + notiChan = nil + return + } + } +} + +var watchers = map[string]*watcher.DeviceWatcher{} + +func GetDeviceWatch(w http.ResponseWriter, r *http.Request) { + + vars := mux.Vars(r) + did, ok := vars["id"] + if !ok { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("not found did")) + return + } + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(err.Error())) + return + } + + // payload := map[string]interface{}{} + // decoder := json.NewDecoder(r.Body) + // err = decoder.Decode(&payload) + // if err != nil { + // w.WriteHeader(http.StatusBadRequest) + // w.Write([]byte(err.Error())) + // return + // } + + if !checkDid(did) { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("not exist did")) + return + } + + s := make(chan map[string]interface{}) + ec := make(chan error) + + defer close(s) + watch, ok := watchers[did] + + if !ok { + watch = &watcher.DeviceWatcher{} + watchers[did] = watch + } + + watch.Subscribe(s) + + defer func() { + cnt := watch.Desubscribe(s) + if cnt == 0 { + delete(watchers, did) + log.Println("delete watchers: ", watchers) + } + }() + + go func() { + _, _, err := conn.ReadMessage() + if err != nil { + ec <- err + } + }() + for { + select { + case param := <-s: + if conn.WriteJSON(param) != nil { + log.Println(err) + w.WriteHeader(http.StatusBadRequest) + return + } + + case <-r.Context().Done(): + fmt.Println("see you later~") + w.WriteHeader(http.StatusOK) + w.Write([]byte("See you later~")) + return + case <-ec: + return + } + + } +} + +func checkDid(did string) bool { + return true +} diff --git a/router/route.go b/router/route.go index 12ce678..9806bae 100644 --- a/router/route.go +++ b/router/route.go @@ -2,24 +2,16 @@ package router import ( "encoding/json" - "etrismartfarmpoc/containermgmt" "etrismartfarmpoc/model" - "fmt" - "io" - "log" "net/http" "strings" - "sync" - "time" - "github.com/google/uuid" "github.com/gorilla/mux" - "github.com/gorilla/websocket" "github.com/urfave/negroni" ) type Notification struct { - Msg string `json:msg` + Msg string `json:"msg"` } var db model.DBHandler @@ -43,7 +35,10 @@ func NewRouter() http.Handler { mux.HandleFunc("/services", PutServices).Methods("PUT") mux.HandleFunc("/devices", PostDevice).Methods("POST") mux.HandleFunc("/devices", GetDevices).Methods("GET") + mux.HandleFunc("/devices", DeleteDevice).Methods("DELETE") mux.HandleFunc("/devices", PutDevice).Methods("PUT", "OPTIONS") + mux.HandleFunc("/device/{id}", GetDeviceWatch).Methods("GET") + mux.HandleFunc("/device/{id}", PutDeviceStatus).Methods("PUT") mux.HandleFunc("/discover", GetDiscoveredDevices).Methods("GET") mux.PathPrefix("/services/").HandlerFunc(RouteRequestToService) @@ -70,101 +65,6 @@ func EchoRoute(w http.ResponseWriter, r *http.Request) { encoder.Encode(m) } -func GetControllerList(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Access-Control-Allow-Origin", "*") - w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE") - list, err := db.GetControllers() - if err != nil { - w.WriteHeader(http.StatusBadRequest) - w.Write([]byte(err.Error())) - return - } - fmt.Println(list) - encoder := json.NewEncoder(w) - - err = encoder.Encode(list) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - w.Write([]byte(err.Error())) - return - } -} - -func PostController(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Access-Control-Allow-Origin", "*") - w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE") - - controller, err := db.AddController(r.Body) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - w.Write([]byte(err.Error())) - return - } - w.WriteHeader(http.StatusCreated) - encoder := json.NewEncoder(w) - err = encoder.Encode(controller) - - if err != nil { - w.WriteHeader(http.StatusBadRequest) - w.Write([]byte(err.Error())) - return - } - - sendNotification(&Notification{Msg: "Added Controller"}) -} - -var upgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - CheckOrigin: func(r *http.Request) bool { return true }, -} - -var notifications []chan *Notification -var notiMutex sync.Mutex - -func sendNotification(noti *Notification) { - // notiMutex.Lock() - // defer notiMutex.Unlock() - for _, ch := range notifications { - ch <- noti - } -} - -func removeNotification(noti chan *Notification) { - notiMutex.Lock() - defer notiMutex.Unlock() - for i, e := range notifications { - if e == noti { - discoveredDevices[i] = discoveredDevices[len(discoveredDevices)-1] - discoveredDevices = discoveredDevices[:len(discoveredDevices)-1] - } - } -} -func GetNotification(w http.ResponseWriter, r *http.Request) { - notiChan := make(chan *Notification, 1) - notiMutex.Lock() - notifications = append(notifications, notiChan) - notiMutex.Unlock() - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - w.Write([]byte(err.Error())) - return - } - - for { - notification := <-notiChan - // fmt.Println("Write!!", discoveredDevices) - if conn.WriteJSON(notification) != nil { - log.Println(err) - w.WriteHeader(http.StatusBadRequest) - removeNotification(notiChan) - notiChan = nil - return - } - } -} - // func GetDiscoveredDevices(w http.ResponseWriter, r *http.Request) { // noti := make(chan string, 1) // mutex.Lock() @@ -187,273 +87,6 @@ func GetNotification(w http.ResponseWriter, r *http.Request) { // } // } -var waitPermission = map[string]chan bool{} -var mutex sync.Mutex -var discoveredDevices = []*model.Device{} - -func GetDiscoveredDevices(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Access-Control-Allow-Origin", "*") - w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE") - - encoder := json.NewEncoder(w) - encoder.Encode(discoveredDevices) -} - -func removeDevice(device *model.Device) { - for i, e := range discoveredDevices { - if e == device { - discoveredDevices[i] = discoveredDevices[len(discoveredDevices)-1] - discoveredDevices = discoveredDevices[:len(discoveredDevices)-1] - } - } -} - -func GetDevices(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Access-Control-Allow-Origin", "*") - w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE") - devices, _, err := db.GetDevices() - - if err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - - encoder := json.NewEncoder(w) - encoder.Encode(devices) -} - -// PostDevice : Handle for Receiving discovery message -func PostDevice(w http.ResponseWriter, r *http.Request) { - var device = &model.Device{} - decoder := json.NewDecoder(r.Body) - err := decoder.Decode(device) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - - // 등록된 제어기로 부터 전송된 요청 메시지임을 확인 - if !db.IsExistController(device.CID) { - w.WriteHeader(http.StatusBadRequest) - w.Write([]byte("Wrong Controller ID")) - return - } - - // 장치 ID 생성 및 탐색된 장치 추가 - device.DID = uuid.NewString() - mutex.Lock() - waitPermission[device.DID] = make(chan bool) - discoveredDevices = append(discoveredDevices, device) - - // 관리자에게 탐색을 알림 - sendNotification(&Notification{Msg: "Add discovered device"}) - mutex.Unlock() - - timer := time.NewTimer(20 * time.Second) - - select { - case <-r.Context().Done(): - fmt.Println("^^") - mutex.Lock() - defer mutex.Unlock() - delete(waitPermission, device.DID) - removeDevice(device) - sendNotification(&Notification{Msg: "Remove discovered device"}) - w.WriteHeader(http.StatusOK) - w.Write([]byte("This operation is not permitted")) - return - case <-timer.C: - mutex.Lock() - defer mutex.Unlock() - delete(waitPermission, device.DID) - removeDevice(device) - sendNotification(&Notification{Msg: "Remove discovered device"}) - w.WriteHeader(http.StatusOK) - w.Write([]byte("This operation is not permitted")) - return - case b := <-waitPermission[device.DID]: - if b { - // 디바이스 등록 절차 수행 - db.AddDevice(device) - db.AddService(device.SName) - - // 디바이스 등록 알림 - w.WriteHeader(http.StatusCreated) - json.NewEncoder(w).Encode(device) - sendNotification(&Notification{Msg: "Added Device"}) - mutex.Lock() - defer mutex.Unlock() - delete(waitPermission, device.DID) - removeDevice(device) - sendNotification(&Notification{Msg: "Add device"}) - } else { - mutex.Lock() - defer mutex.Unlock() - delete(waitPermission, device.DID) - removeDevice(device) - sendNotification(&Notification{Msg: "Remove discovered device"}) - w.WriteHeader(http.StatusOK) - w.Write([]byte("This operation is not permitted")) - } - } -} - -func PutDevice(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Access-Control-Allow-Origin", "*") - w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE") - msg := map[string]string{} - decoder := json.NewDecoder(r.Body) - err := decoder.Decode(&msg) - - fmt.Println("msg[did]: ", msg["did"]) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - w.Write([]byte(err.Error())) - } - - ch, ok := waitPermission[msg["did"]] - if !ok { - w.WriteHeader(http.StatusBadRequest) - w.Write([]byte("Wrong Device ID is Sended")) - return - } - ch <- true - w.WriteHeader(http.StatusOK) -} - -func GetServices(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Access-Control-Allow-Origin", "*") - w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE") - - if r.ContentLength != 0 { - obj := map[string]string{} - err := json.NewDecoder(r.Body).Decode(&obj) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - - sid, err := db.GetSID(obj["sname"]) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - w.Write([]byte(sid)) - return - } - - l, err := db.GetServices() - if err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - - err = json.NewEncoder(w).Encode(l) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } -} - -func PutServices(w http.ResponseWriter, r *http.Request) { - var obj = map[string]string{} - - decoder := json.NewDecoder(r.Body) - err := decoder.Decode(&obj) - - if err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - - s, err := db.UpdateService(obj["name"], obj["addr"]) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - - sendNotification(&Notification{Msg: "Update Service"}) - err = json.NewEncoder(w).Encode(s) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } -} - -// func GetServiceList(w http.ResponseWriter, r *http.Request) { -// result := containermgmt.GetContainers(context.Background()) -// b := result.Value(containermgmt.ReturnKey).([]byte) -// w.Write(b) -// } - -func PostService(w http.ResponseWriter, r *http.Request) { - - var obj = map[string]string{} - decoder := json.NewDecoder(r.Body) - err := decoder.Decode(&obj) - - db.IsExistService(obj["name"]) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - w.Write([]byte(err.Error())) - return - } - - err = containermgmt.CreateContainer(obj["name"]) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - w.Write([]byte(err.Error())) - return - } - - w.WriteHeader(http.StatusCreated) -} - -func RouteRequestToService(w http.ResponseWriter, r *http.Request) { - l := len([]rune("/services/")) - var id string - var url string - idx := strings.Index(r.RequestURI[l:], "/") - if idx == -1 { - id = r.RequestURI[l:] - url = "/" - } else { - id = r.RequestURI[l : l+idx] - if len([]rune(r.RequestURI)) <= l+idx { - url = "/" - } else { - url = r.RequestURI[l+idx:] - } - } - - // w.Write([]byte(vars["url"])) - host, err := db.GetAddr(id) - if err != nil { - w.WriteHeader(http.StatusNotFound) - return - } - - log.Println("Route to", host+"/"+url) - req, err := http.NewRequest(r.Method, "http://"+host+"/"+url, r.Body) - if err != nil { - // 잘못된 메시지 포맷이 전달된 경우 - w.WriteHeader(http.StatusBadRequest) - fmt.Fprint(w, err) - return - } - - resp, err := http.DefaultClient.Do(req) - - if err != nil { - // 잘못된 메시지 포맷이 전달된 경우 - w.WriteHeader(http.StatusBadRequest) - fmt.Fprint(w, err) - return - } - - io.Copy(w, resp.Body) -} - // func mapService(urls []string) { // } diff --git a/router/services.go b/router/services.go new file mode 100644 index 0000000..377cb75 --- /dev/null +++ b/router/services.go @@ -0,0 +1,139 @@ +package router + +import ( + "encoding/json" + "etrismartfarmpoc/containermgmt" + "fmt" + "io" + "log" + "net/http" + "strings" +) + +func GetServices(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE") + + sname := r.Header.Get("sname") + + if len(sname) != 0 { + sid, err := db.GetSID(sname) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + w.Write([]byte(sid)) + return + } + + l, err := db.GetServices() + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + err = json.NewEncoder(w).Encode(l) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } +} + +func PutServices(w http.ResponseWriter, r *http.Request) { + var obj = map[string]string{} + + decoder := json.NewDecoder(r.Body) + err := decoder.Decode(&obj) + + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + s, err := db.UpdateService(obj["name"], obj["addr"]) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + sendNotification(&Notification{Msg: "Update Service"}) + err = json.NewEncoder(w).Encode(s) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } +} + +// func GetServiceList(w http.ResponseWriter, r *http.Request) { +// result := containermgmt.GetContainers(context.Background()) +// b := result.Value(containermgmt.ReturnKey).([]byte) +// w.Write(b) +// } + +func PostService(w http.ResponseWriter, r *http.Request) { + + var obj = map[string]string{} + decoder := json.NewDecoder(r.Body) + err := decoder.Decode(&obj) + + db.IsExistService(obj["name"]) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(err.Error())) + return + } + + err = containermgmt.CreateContainer(obj["name"]) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(err.Error())) + return + } + + w.WriteHeader(http.StatusCreated) +} + +func RouteRequestToService(w http.ResponseWriter, r *http.Request) { + l := len([]rune("/services/")) + var id string + var url string + idx := strings.Index(r.RequestURI[l:], "/") + if idx == -1 { + id = r.RequestURI[l:] + url = "/" + } else { + id = r.RequestURI[l : l+idx] + if len([]rune(r.RequestURI)) <= l+idx { + url = "/" + } else { + url = r.RequestURI[l+idx:] + } + } + + // w.Write([]byte(vars["url"])) + host, err := db.GetAddr(id) + if err != nil { + w.WriteHeader(http.StatusNotFound) + return + } + + log.Println("Route to", host+url) + req, err := http.NewRequest(r.Method, "http://"+host+url, r.Body) + if err != nil { + // 잘못된 메시지 포맷이 전달된 경우 + w.WriteHeader(http.StatusBadRequest) + fmt.Fprint(w, err) + return + } + + resp, err := http.DefaultClient.Do(req) + + if err != nil { + // 잘못된 메시지 포맷이 전달된 경우 + w.WriteHeader(http.StatusBadRequest) + fmt.Fprint(w, err) + return + } + + io.Copy(w, resp.Body) +} diff --git a/watcher/cli.go b/watcher/cli.go new file mode 100644 index 0000000..c948175 --- /dev/null +++ b/watcher/cli.go @@ -0,0 +1,5 @@ +package watcher + +func NewStateChangedEvent(t interface{}, param map[string]interface{}) *StatusChangedEvent { + return &StatusChangedEvent{t: t, param: param} +} diff --git a/watcher/device_watcher.go b/watcher/device_watcher.go new file mode 100644 index 0000000..c45e7ff --- /dev/null +++ b/watcher/device_watcher.go @@ -0,0 +1,32 @@ +package watcher + +type DeviceWatcher struct { + // did string + subscriber []chan map[string]interface{} +} + +func (dw *DeviceWatcher) Notify(e Event) { + dw.onChanged(e) +} + +func (dw *DeviceWatcher) onChanged(e Event) { + for _, s := range dw.subscriber { + s <- e.Param() + } +} + +func (dw *DeviceWatcher) Subscribe(s chan map[string]interface{}) { + dw.subscriber = append(dw.subscriber, s) +} + +func (dw *DeviceWatcher) Desubscribe(s chan map[string]interface{}) int { + length := len(dw.subscriber) + for i, e := range dw.subscriber { + if s == e { + dw.subscriber[i] = dw.subscriber[length-1] + dw.subscriber = dw.subscriber[:length-1] + return length - 1 + } + } + return length +} diff --git a/watcher/interface.go b/watcher/interface.go new file mode 100644 index 0000000..f7744f0 --- /dev/null +++ b/watcher/interface.go @@ -0,0 +1,13 @@ +package watcher + +type Watcher interface { +} + +type Event interface { + Type() interface{} + Param() map[string]interface{} +} + +type EventListener interface { + Handle(e Event) +} diff --git a/watcher/status_changed.go b/watcher/status_changed.go new file mode 100644 index 0000000..7da0560 --- /dev/null +++ b/watcher/status_changed.go @@ -0,0 +1,44 @@ +package watcher + +type StatusChangedEvent struct { + Event + t interface{} + param map[string]interface{} +} + +func (e *StatusChangedEvent) Type() interface{} { + return e.t +} + +func (e *StatusChangedEvent) Param() map[string]interface{} { + return e.param +} + +// type StatusChangedListener struct { +// h func(e Event) +// } + +// func (l *StatusChangedListener) Handle(e Event) { +// if l.h != nil { +// l.h(e) +// } +// } + +// func (l *StatusChangedListener) AddSubscriber(s chan map[string]interface{}) { +// l.subscriber = append(l.subscriber, s) +// } + +// func (l *StatusChangedListener) RemoveSubscriber(s chan map[string]interface{}) int { +// length := len(l.subscriber) +// for i, e := range l.subscriber { +// if s == e { +// l.subscriber[i] = l.subscriber[length-1] +// l.subscriber = l.subscriber[:length-1] +// return length - 1 +// } +// } + +// return length +// } + +// func (l *StatusChangedListener) onClose() {}