Compare commits

..

4 Commits
main ... master

Author SHA1 Message Date
418b5ebad4 commit 2021-12-17]09:17:23 2021-12-17 09:17:26 +09:00
66c1674539 commit 2021-12-15]17:07:50 2021-12-15 17:07:52 +09:00
fa43e0cb9f commit 2021-12-13]14:39:07 2021-12-13 14:39:16 +09:00
3ade3a2d62 commit 2021-12-01]09:21:33 2021-12-01 09:21:36 +09:00
29 changed files with 1118 additions and 462 deletions

1
.gitignore vendored
View File

@ -1 +1,2 @@
public/
front/

View File

@ -0,0 +1,32 @@
package constants
import (
"fmt"
"net"
"os"
"strings"
)
func getIP() string {
host, _ := os.Hostname()
addrs, _ := net.LookupIP(host)
return addrs[0].String()
}
var ServerAddr string
var MyIP string
func init() {
var exist bool
ServerAddr, exist = os.LookupEnv("SERVER_ADDR")
if !exist {
fmt.Println("Please set SERVER_ADDR as environment variable")
}
MyIP = getIP()
idx := strings.LastIndex(MyIP, ".")
ServerAddr = MyIP[:idx+1] + "1"
fmt.Println(ServerAddr)
}

View File

@ -1,3 +1,8 @@
module devicemanagera
module devicemanagerb
go 1.17
require (
github.com/gorilla/mux v1.7.4
github.com/urfave/negroni v1.0.0
)

View File

@ -0,0 +1,4 @@
github.com/gorilla/mux v1.7.4 h1:VuZ8uybHlWmqV03+zRzdwKL4tUnIp1MAQtp1mIFE1bc=
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/urfave/negroni v1.0.0 h1:kIimOitoypq34K7TG7DUaJ9kq/N4Ofuwi1sjz0KipXc=
github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4=

View File

@ -2,50 +2,30 @@ package main
import (
"bytes"
"devicemanagera/router"
"devicemanagerb/constants"
"devicemanagerb/router"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"strings"
)
var server_addr string
func getIP() string {
host, _ := os.Hostname()
addrs, _ := net.LookupIP(host)
return addrs[0].String()
}
func registerToServer() {
var exist bool
server_addr, exist = os.LookupEnv("SERVER_ADDR")
if !exist {
fmt.Println("Please set SERVER_ADDR as environment variable")
}
ip := getIP()
idx := strings.LastIndex(ip, ".")
serverAddr := ip[:idx+1] + "1"
fmt.Println(serverAddr)
var obj map[string]string = make(map[string]string)
obj["name"] = "devicemanagera"
obj["addr"] = getIP() + ":3000"
obj["addr"] = constants.MyIP + ":3000"
b, err := json.Marshal(obj)
if err != nil {
panic(err)
}
req, err := http.NewRequest("PUT", "http://"+serverAddr+":3000/services", bytes.NewBuffer(b))
req, err := http.NewRequest("PUT", "http://"+constants.ServerAddr+":3000/services", bytes.NewBuffer(b))
if err != nil {
panic(err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
panic(err)
@ -56,7 +36,6 @@ func registerToServer() {
panic(err)
}
fmt.Println(string(b))
// http.Post("")
}
func main() {

View File

@ -1,15 +1,140 @@
package router
import (
"bytes"
"devicemanagerb/constants"
"encoding/json"
"log"
"net/http"
"github.com/gorilla/mux"
"github.com/urfave/negroni"
)
func NewRouter() http.Handler {
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("I am devicemanagerA"))
})
mux := mux.NewRouter()
// mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
// w.WriteHeader(http.StatusOK)
// w.Write([]byte("I am devicemanagerB"))
// })
return mux
mux.HandleFunc("/{id}", PutStatusChangedHandle).Methods("PUT")
mux.HandleFunc("/{id}", PostStatusChangedHandle).Methods("POST")
mux.HandleFunc("/{id}", GetStatusHandle).Methods("GET")
n := negroni.Classic()
n.UseHandler(mux)
return n
}
// sensing data per device
var state = map[string]interface{}{}
// status from sensor
func PutStatusChangedHandle(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
_status := map[string]interface{}{}
decoder := json.NewDecoder(r.Body)
err := decoder.Decode(&_status)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
did, ok := vars["id"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
// state[did] = _status
// log.Println(state[did])
_, ok = state[did]
if !ok {
state[did] = _status
}
b, err := json.Marshal(state[did])
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
w.Write(b)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
w.WriteHeader(http.StatusOK)
req, err := http.NewRequest("PUT", "http://"+constants.ServerAddr+":3000/device/"+did, bytes.NewReader(b))
if err != nil {
return
}
_, err = http.DefaultClient.Do(req)
if err != nil {
panic(err)
}
}
func GetStatusHandle(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
did, ok := vars["id"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
return
}
_status, ok := state[did]
if !ok {
_status = map[string]interface{}{
"msg": "hello world",
}
}
encoder := json.NewEncoder(w)
err := encoder.Encode(_status)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
w.WriteHeader(http.StatusOK)
}
func PostStatusChangedHandle(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
_status := map[string]interface{}{}
decoder := json.NewDecoder(r.Body)
err := decoder.Decode(&_status)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
did, ok := vars["id"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
state[did] = _status
log.Println(_status)
w.WriteHeader(http.StatusOK)
}
// status from user per device

View File

@ -0,0 +1,32 @@
package constants
import (
"fmt"
"net"
"os"
"strings"
)
func getIP() string {
host, _ := os.Hostname()
addrs, _ := net.LookupIP(host)
return addrs[0].String()
}
var ServerAddr string
var MyIP string
func init() {
var exist bool
ServerAddr, exist = os.LookupEnv("SERVER_ADDR")
if !exist {
fmt.Println("Please set SERVER_ADDR as environment variable")
}
MyIP = getIP()
idx := strings.LastIndex(MyIP, ".")
ServerAddr = MyIP[:idx+1] + "1"
fmt.Println(ServerAddr)
}

View File

@ -1,3 +1,8 @@
module devicemanagerb
go 1.17
require (
github.com/gorilla/mux v1.7.4
github.com/urfave/negroni v1.0.0
)

View File

@ -0,0 +1,4 @@
github.com/gorilla/mux v1.7.4 h1:VuZ8uybHlWmqV03+zRzdwKL4tUnIp1MAQtp1mIFE1bc=
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/urfave/negroni v1.0.0 h1:kIimOitoypq34K7TG7DUaJ9kq/N4Ofuwi1sjz0KipXc=
github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4=

View File

@ -2,46 +2,26 @@ package main
import (
"bytes"
"devicemanagerb/constants"
"devicemanagerb/router"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"strings"
)
var server_addr string
func getIP() string {
host, _ := os.Hostname()
addrs, _ := net.LookupIP(host)
return addrs[0].String()
}
func registerToServer() {
var exist bool
server_addr, exist = os.LookupEnv("SERVER_ADDR")
if !exist {
fmt.Println("Please set SERVER_ADDR as environment variable")
}
ip := getIP()
idx := strings.LastIndex(ip, ".")
serverAddr := ip[:idx+1] + "1"
fmt.Println(serverAddr)
var obj map[string]string = make(map[string]string)
obj["name"] = "devicemanagerb"
obj["addr"] = getIP() + ":3000"
obj["addr"] = constants.MyIP + ":3000"
b, err := json.Marshal(obj)
if err != nil {
panic(err)
}
req, err := http.NewRequest("PUT", "http://"+serverAddr+":3000/services", bytes.NewBuffer(b))
req, err := http.NewRequest("PUT", "http://"+constants.ServerAddr+":3000/services", bytes.NewBuffer(b))
if err != nil {
panic(err)
}
@ -56,7 +36,6 @@ func registerToServer() {
panic(err)
}
fmt.Println(string(b))
// http.Post("")
}
func main() {

View File

@ -1,15 +1,150 @@
package router
import (
"bytes"
"devicemanagerb/constants"
"encoding/json"
"io/ioutil"
"log"
"net/http"
"github.com/gorilla/mux"
"github.com/urfave/negroni"
)
func NewRouter() http.Handler {
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("I am devicemanagerB"))
})
mux := mux.NewRouter()
// mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
// w.WriteHeader(http.StatusOK)
// w.Write([]byte("I am devicemanagerB"))
// })
return mux
mux.HandleFunc("/{id}", PutStatusChangedHandle).Methods("PUT")
mux.HandleFunc("/{id}", PostStatusChangedHandle).Methods("POST")
mux.HandleFunc("/{id}", GetStatusHandle).Methods("GET")
n := negroni.Classic()
n.UseHandler(mux)
return n
}
// sensing data per device
var s_data = map[string]interface{}{}
// status from sensor
func PutStatusChangedHandle(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
b, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
status := map[string]interface{}{}
err = json.Unmarshal(b, &status)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
did, ok := vars["id"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
s_data[did] = status
log.Println(s_data)
cdata, ok := c_data[did]
if !ok {
w.Write([]byte("I am devicemanagerB"))
} else {
encoder := json.NewEncoder(w)
err := encoder.Encode(cdata)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
}
req, err := http.NewRequest("PUT", "http://"+constants.ServerAddr+":3000/device/"+did, bytes.NewReader(b))
if err != nil {
return
}
_, err = http.DefaultClient.Do(req)
if err != nil {
panic(err)
}
}
var c_data = map[string]interface{}{}
func GetStatusHandle(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
did, ok := vars["id"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
return
}
status, ok := c_data[did]
if !ok {
status = map[string]interface{}{
"servo": 0,
"fan": 0,
"light": 0,
}
}
encoder := json.NewEncoder(w)
err := encoder.Encode(status)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
w.WriteHeader(http.StatusOK)
}
func PostStatusChangedHandle(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
b, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
status := map[string]interface{}{}
err = json.Unmarshal(b, &status)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
did, ok := vars["id"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
c_data[did] = status
log.Println(c_data)
w.WriteHeader(http.StatusOK)
}
// status from user per device

BIN
dump.db

Binary file not shown.

BIN
eth.pcap

Binary file not shown.

View File

@ -1 +1,3 @@
rm -r dump.db eth.pcap lo.pcap
rm -r *.db
docker container stop $(docker container ls -q)
docker container prune

View File

@ -55,10 +55,5 @@ func (s *dbHandler) IsExistController(cid string) bool {
result := s.db.First(&controller, "cid=?", cid)
if result.Error != nil {
fmt.Println(result.Error)
return false
}
return true
return result.Error == nil
}

View File

@ -41,3 +41,32 @@ func (s *dbHandler) AddDevice(device *Device) error {
return nil
}
func (s *dbHandler) QueryDevice(dname string) (*Device, error) {
var device Device
tx := s.db.First(&device, "dname=?", dname)
if tx.Error != nil {
return nil, tx.Error
}
return &device, nil
}
func (s *dbHandler) DeleteDevice(device *Device) error {
tx := s.db.Delete(device)
if tx.Error != nil {
return tx.Error
}
// tx.First(device, "did=?", device.DID)
return nil
}
func (s *dbHandler) IsExistDevice(dname string) bool {
var device = Device{}
result := s.db.First(&device, "dname=?", dname)
return result.Error != nil
}

View File

@ -16,6 +16,9 @@ import (
type DBHandler interface {
GetDevices() ([]*Device, int, error)
AddDevice(d *Device) error
QueryDevice(dname string) (*Device, error)
DeleteDevice(device *Device) error
IsExistDevice(dname string) bool
AddController(r io.Reader) (*Controller, error)
GetControllers() ([]*Controller, error)
IsExistController(cid string) bool

View File

@ -1,6 +1,8 @@
package model
import (
"errors"
"github.com/google/uuid"
"gorm.io/gorm"
)
@ -77,6 +79,9 @@ func (s *dbHandler) GetSID(name string) (string, error) {
return "", tx.Error
}
if len(service.SID) == 0 {
return "", errors.New("not installed service")
}
return service.SID, nil
}

14
public-ws/index.html Normal file
View File

@ -0,0 +1,14 @@
<!DOCTYPE html>
<html>
<head>
<title>Web socket sample</title>
<script src="./index.js"></script>
</head>
<body>
<div>hello</div>
</body>
</html>

25
public-ws/index.js Normal file
View File

@ -0,0 +1,25 @@
window.onload = () => {
if (!window.WebSocket) {
alert("No WebSocket!");
return;
}
function connect() {
let ws = new WebSocket(`ws://${window.location.host}/device/73531420-ae8c-47be-9eff-308d946f3a65`);
ws.onopen = (e) => {
console.log("onopen", arguments);
}
ws.onclose = () => {
console.log("onclose", arguments);
}
ws.onmessage = function (e) {
console.log(e.data);
// addMessage(JSON.parse(e.data));
console.log(JSON.parse(e.data));
}
return ws;
}
ws = connect();
}

50
router/controller.go Normal file
View File

@ -0,0 +1,50 @@
package router
import (
"encoding/json"
"fmt"
"net/http"
)
func GetControllerList(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")
list, err := db.GetControllers()
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
fmt.Println(list)
encoder := json.NewEncoder(w)
err = encoder.Encode(list)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
}
func PostController(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")
controller, err := db.AddController(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
w.WriteHeader(http.StatusCreated)
encoder := json.NewEncoder(w)
err = encoder.Encode(controller)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
sendNotification(&Notification{Msg: "Added Controller"})
}

243
router/devices.go Normal file
View File

@ -0,0 +1,243 @@
package router
import (
"encoding/json"
"etrismartfarmpoc/model"
"etrismartfarmpoc/watcher"
"fmt"
"net/http"
"sync"
"time"
"github.com/google/uuid"
"github.com/gorilla/mux"
)
var waitPermission = map[string]chan bool{}
var mutex sync.Mutex
var discoveredDevices = []*model.Device{}
func GetDiscoveredDevices(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")
encoder := json.NewEncoder(w)
encoder.Encode(discoveredDevices)
}
func removeDevice(device *model.Device) {
for i, e := range discoveredDevices {
if e == device {
discoveredDevices[i] = discoveredDevices[len(discoveredDevices)-1]
discoveredDevices = discoveredDevices[:len(discoveredDevices)-1]
}
}
}
func GetDevices(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")
devices, _, err := db.GetDevices()
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
encoder := json.NewEncoder(w)
encoder.Encode(devices)
}
// PostDevice : Handle for Receiving discovery message
func PostDevice(w http.ResponseWriter, r *http.Request) {
var device = &model.Device{}
decoder := json.NewDecoder(r.Body)
err := decoder.Decode(device)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
// 등록된 제어기로 부터 전송된 요청 메시지임을 확인
if !db.IsExistController(device.CID) {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("Wrong Controller ID"))
return
}
if !db.IsExistDevice(device.DName) {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("Already exist device"))
return
}
// 장치 ID 생성 및 탐색된 장치 추가
device.DID = uuid.NewString()
mutex.Lock()
waitPermission[device.DID] = make(chan bool)
discoveredDevices = append(discoveredDevices, device)
// 관리자에게 탐색을 알림
sendNotification(&Notification{Msg: "Add discovered device"})
mutex.Unlock()
timer := time.NewTimer(20 * time.Second)
select {
case <-r.Context().Done():
mutex.Lock()
defer mutex.Unlock()
delete(waitPermission, device.DID)
removeDevice(device)
sendNotification(&Notification{Msg: "Remove discovered device"})
w.WriteHeader(http.StatusOK)
w.Write([]byte("This operation is not permitted"))
return
case <-timer.C:
mutex.Lock()
defer mutex.Unlock()
delete(waitPermission, device.DID)
removeDevice(device)
sendNotification(&Notification{Msg: "Remove discovered device"})
w.WriteHeader(http.StatusOK)
w.Write([]byte("This operation is not permitted"))
return
case b, ok := <-waitPermission[device.DID]:
if !ok {
return
}
if b {
// 디바이스 등록 절차 수행
db.AddDevice(device)
db.AddService(device.SName)
// 디바이스 등록 알림
w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(device)
sendNotification(&Notification{Msg: "Added Device"})
mutex.Lock()
defer mutex.Unlock()
delete(waitPermission, device.DID)
removeDevice(device)
sendNotification(&Notification{Msg: "Add device"})
} else {
mutex.Lock()
defer mutex.Unlock()
delete(waitPermission, device.DID)
removeDevice(device)
sendNotification(&Notification{Msg: "Remove discovered device"})
w.WriteHeader(http.StatusOK)
w.Write([]byte("This operation is not permitted"))
}
}
}
func DeleteDevice(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")
msg := map[string]string{}
decoder := json.NewDecoder(r.Body)
err := decoder.Decode(&msg)
device, err := db.QueryDevice(msg["dname"])
db.DeleteDevice(device)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
}
sendNotification(&Notification{Msg: "Delete device"})
w.WriteHeader(http.StatusOK)
}
func PutDevice(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")
msg := map[string]string{}
decoder := json.NewDecoder(r.Body)
err := decoder.Decode(&msg)
fmt.Println("msg[did]: ", msg["did"])
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
}
ch, ok := waitPermission[msg["did"]]
if !ok {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("Wrong Device ID is Sended"))
return
}
ch <- true
w.WriteHeader(http.StatusOK)
}
// 값 변경 알림
func PutDeviceStatus(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
did, ok := vars["id"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("not found did"))
return
}
if r.ContentLength == 0 {
w.WriteHeader(http.StatusOK)
return
}
watch, ok := watchers[did]
if !ok {
w.WriteHeader(http.StatusOK)
return
}
param := map[string]interface{}{}
decoder := json.NewDecoder(r.Body)
err := decoder.Decode(&param)
if err != nil {
w.WriteHeader(http.StatusOK)
return
}
watch.Notify(watcher.NewStateChangedEvent(nil, param))
w.WriteHeader(http.StatusOK)
}
// func PostDeviceStatus(w http.ResponseWriter, r *http.Request) {
// vars := mux.Vars(r)
// did, ok := vars["id"]
// if !ok {
// w.WriteHeader(http.StatusBadRequest)
// w.Write([]byte("not found did"))
// return
// }
// if r.ContentLength == 0 {
// w.WriteHeader(http.StatusOK)
// return
// }
// watch, ok := watchers[did]
// if !ok {
// w.WriteHeader(http.StatusOK)
// return
// }
// param := map[string]interface{}{}
// decoder := json.NewDecoder(r.Body)
// err := decoder.Decode(&param)
// if err != nil {
// w.WriteHeader(http.StatusOK)
// return
// }
// watch.Notify(watcher.NewStateChangedEvent(nil, param))
// }

143
router/notification.go Normal file
View File

@ -0,0 +1,143 @@
package router
import (
"etrismartfarmpoc/watcher"
"fmt"
"log"
"net/http"
"sync"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool { return true },
}
var notifications []chan *Notification
var notiMutex sync.Mutex
func sendNotification(noti *Notification) {
// notiMutex.Lock()
// defer notiMutex.Unlock()
for _, ch := range notifications {
ch <- noti
}
}
func removeNotification(noti chan *Notification) {
notiMutex.Lock()
defer notiMutex.Unlock()
for i, e := range notifications {
if e == noti {
notifications[i] = notifications[len(notifications)-1]
notifications = notifications[:len(notifications)-1]
}
}
}
func GetNotification(w http.ResponseWriter, r *http.Request) {
notiChan := make(chan *Notification, 1)
notiMutex.Lock()
notifications = append(notifications, notiChan)
notiMutex.Unlock()
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
for {
notification := <-notiChan
// fmt.Println("Write!!", discoveredDevices)
if conn.WriteJSON(notification) != nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
removeNotification(notiChan)
notiChan = nil
return
}
}
}
var watchers = map[string]*watcher.DeviceWatcher{}
func GetDeviceWatch(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
did, ok := vars["id"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("not found did"))
return
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
if !checkDid(did) {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("not exist did"))
return
}
s := make(chan map[string]interface{})
defer close(s)
ec := make(chan error)
defer close(ec)
watch, ok := watchers[did]
if !ok {
watch = &watcher.DeviceWatcher{}
watchers[did] = watch
}
watch.Subscribe(s)
defer func() {
cnt := watch.Desubscribe(s)
if cnt == 0 {
delete(watchers, did)
log.Println("delete watchers: ", watchers)
}
}()
go func() {
_, _, err := conn.ReadMessage()
if err != nil {
ec <- err
}
}()
for {
select {
case param := <-s:
if conn.WriteJSON(param) != nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
return
}
case <-r.Context().Done():
fmt.Println("see you later~")
w.WriteHeader(http.StatusOK)
w.Write([]byte("See you later~"))
return
case <-ec:
return
}
}
}
func checkDid(did string) bool {
return true
}

View File

@ -2,24 +2,16 @@ package router
import (
"encoding/json"
"etrismartfarmpoc/containermgmt"
"etrismartfarmpoc/model"
"fmt"
"io"
"log"
"net/http"
"strings"
"sync"
"time"
"github.com/google/uuid"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/urfave/negroni"
)
type Notification struct {
Msg string `json:msg`
Msg string `json:"msg"`
}
var db model.DBHandler
@ -43,7 +35,10 @@ func NewRouter() http.Handler {
mux.HandleFunc("/services", PutServices).Methods("PUT")
mux.HandleFunc("/devices", PostDevice).Methods("POST")
mux.HandleFunc("/devices", GetDevices).Methods("GET")
mux.HandleFunc("/devices", DeleteDevice).Methods("DELETE")
mux.HandleFunc("/devices", PutDevice).Methods("PUT", "OPTIONS")
mux.HandleFunc("/device/{id}", GetDeviceWatch).Methods("GET")
mux.HandleFunc("/device/{id}", PutDeviceStatus).Methods("PUT")
mux.HandleFunc("/discover", GetDiscoveredDevices).Methods("GET")
mux.PathPrefix("/services/").HandlerFunc(RouteRequestToService)
@ -70,115 +65,6 @@ func EchoRoute(w http.ResponseWriter, r *http.Request) {
encoder.Encode(m)
}
func GetControllerList(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")
list, err := db.GetControllers()
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
fmt.Println(list)
encoder := json.NewEncoder(w)
err = encoder.Encode(list)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
}
func PostController(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")
controller, err := db.AddController(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
w.WriteHeader(http.StatusCreated)
encoder := json.NewEncoder(w)
err = encoder.Encode(controller)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
sendNotification(&Notification{Msg: "Added Controller"})
}
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool { return true },
}
var notifications []chan *Notification
var notiMutex sync.Mutex
func sendNotification(noti *Notification) {
// notiMutex.Lock()
// defer notiMutex.Unlock()
for _, ch := range notifications {
ch <- noti
}
}
func removeNotification(noti chan *Notification) {
notiMutex.Lock()
defer notiMutex.Unlock()
for i, e := range notifications {
if e == noti {
discoveredDevices[i] = discoveredDevices[len(discoveredDevices)-1]
discoveredDevices = discoveredDevices[:len(discoveredDevices)-1]
}
}
}
func GetNotification(w http.ResponseWriter, r *http.Request) {
notiChan := make(chan *Notification, 1)
notiMutex.Lock()
notifications = append(notifications, notiChan)
notiMutex.Unlock()
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
for {
notification := <-notiChan
// fmt.Println("Write!!", discoveredDevices)
if conn.WriteJSON(notification) != nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
removeNotification(notiChan)
notiChan = nil
return
}
}
}
func GetDiscoveredDevices(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")
devices, _, err := db.GetDevices()
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
encoder := json.NewEncoder(w)
encoder.Encode(devices)
}
// func GetDiscoveredDevices(w http.ResponseWriter, r *http.Request) {
// noti := make(chan string, 1)
// mutex.Lock()
@ -201,279 +87,6 @@ func GetDiscoveredDevices(w http.ResponseWriter, r *http.Request) {
// }
// }
var waitPermission = map[string]chan bool{}
var mutex sync.Mutex
var discoveredDevices []*model.Device
var discoveredNotifications []chan string
func removeDevice(device *model.Device) {
for i, e := range discoveredDevices {
if e == device {
discoveredDevices[i] = discoveredDevices[len(discoveredDevices)-1]
discoveredDevices = discoveredDevices[:len(discoveredDevices)-1]
}
}
}
func GetDevices(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")
devices, _, err := db.GetDevices()
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
encoder := json.NewEncoder(w)
encoder.Encode(devices)
}
// PostDevice : Handle for Receiving discovery message
func PostDevice(w http.ResponseWriter, r *http.Request) {
var device = &model.Device{}
decoder := json.NewDecoder(r.Body)
err := decoder.Decode(device)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
// 등록된 제어기로 부터 전송된 요청 메시지임을 확인
if !db.IsExistController(device.CID) {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("Wrong Controller ID"))
return
}
// 장치 ID 생성 및 탐색된 장치 추가
device.DID = uuid.NewString()
mutex.Lock()
waitPermission[device.DID] = make(chan bool)
discoveredDevices = append(discoveredDevices, device)
// 관리자에게 탐색을 알림
for _, noti := range discoveredNotifications {
noti <- device.DID
}
mutex.Unlock()
timer := time.NewTimer(20 * time.Second)
select {
case <-r.Context().Done():
fmt.Println("Done!!")
mutex.Lock()
defer mutex.Unlock()
delete(waitPermission, device.DID)
removeDevice(device)
for _, noti := range discoveredNotifications {
noti <- device.DID
}
w.WriteHeader(http.StatusOK)
w.Write([]byte("This operation is not permitted"))
return
case <-timer.C:
mutex.Lock()
defer mutex.Unlock()
delete(waitPermission, device.DID)
removeDevice(device)
for _, noti := range discoveredNotifications {
noti <- device.DID
}
w.WriteHeader(http.StatusOK)
w.Write([]byte("This operation is not permitted"))
return
case b := <-waitPermission[device.DID]:
if b {
// 디바이스 등록 절차 수행
db.AddDevice(device)
db.AddService(device.SName)
// 디바이스 등록 알림
w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(device)
sendNotification(&Notification{Msg: "Added Device"})
mutex.Lock()
defer mutex.Unlock()
delete(waitPermission, device.DID)
removeDevice(device)
for _, noti := range discoveredNotifications {
noti <- device.DID
}
} else {
mutex.Lock()
defer mutex.Unlock()
delete(waitPermission, device.DID)
removeDevice(device)
for _, noti := range discoveredNotifications {
noti <- device.DID
}
w.WriteHeader(http.StatusOK)
w.Write([]byte("This operation is not permitted"))
}
}
}
func PutDevice(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")
msg := map[string]string{}
decoder := json.NewDecoder(r.Body)
err := decoder.Decode(&msg)
fmt.Println("msg[did]: ", msg["did"])
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
}
ch, ok := waitPermission[msg["did"]]
if !ok {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("Wrong Device ID is Sended"))
return
}
ch <- true
w.WriteHeader(http.StatusOK)
}
func GetServices(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")
if r.ContentLength != 0 {
obj := map[string]string{}
err := json.NewDecoder(r.Body).Decode(&obj)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
sid, err := db.GetSID(obj["sname"])
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
w.Write([]byte(sid))
return
}
l, err := db.GetServices()
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
err = json.NewEncoder(w).Encode(l)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
}
func PutServices(w http.ResponseWriter, r *http.Request) {
var obj = map[string]string{}
decoder := json.NewDecoder(r.Body)
err := decoder.Decode(&obj)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
s, err := db.UpdateService(obj["name"], obj["addr"])
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
sendNotification(&Notification{Msg: "Update Service"})
err = json.NewEncoder(w).Encode(s)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
}
// func GetServiceList(w http.ResponseWriter, r *http.Request) {
// result := containermgmt.GetContainers(context.Background())
// b := result.Value(containermgmt.ReturnKey).([]byte)
// w.Write(b)
// }
func PostService(w http.ResponseWriter, r *http.Request) {
var obj = map[string]string{}
decoder := json.NewDecoder(r.Body)
err := decoder.Decode(&obj)
db.IsExistService(obj["name"])
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
err = containermgmt.CreateContainer(obj["name"])
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
w.WriteHeader(http.StatusCreated)
}
func RouteRequestToService(w http.ResponseWriter, r *http.Request) {
l := len([]rune("/services/"))
var id string
var url string
idx := strings.Index(r.RequestURI[l:], "/")
if idx == -1 {
id = r.RequestURI[l:]
url = "/"
} else {
id = r.RequestURI[l : l+idx]
if len([]rune(r.RequestURI)) <= l+idx {
url = "/"
} else {
url = r.RequestURI[l+idx:]
}
}
// w.Write([]byte(vars["url"]))
host, err := db.GetAddr(id)
if err != nil {
w.WriteHeader(http.StatusNotFound)
return
}
log.Println("Route to", host+"/"+url)
req, err := http.NewRequest(r.Method, "http://"+host+"/"+url, r.Body)
if err != nil {
// 잘못된 메시지 포맷이 전달된 경우
w.WriteHeader(http.StatusBadRequest)
fmt.Fprint(w, err)
return
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
// 잘못된 메시지 포맷이 전달된 경우
w.WriteHeader(http.StatusBadRequest)
fmt.Fprint(w, err)
return
}
io.Copy(w, resp.Body)
}
// func mapService(urls []string) {
// }

139
router/services.go Normal file
View File

@ -0,0 +1,139 @@
package router
import (
"encoding/json"
"etrismartfarmpoc/containermgmt"
"fmt"
"io"
"log"
"net/http"
"strings"
)
func GetServices(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")
sname := r.Header.Get("sname")
if len(sname) != 0 {
sid, err := db.GetSID(sname)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
w.Write([]byte(sid))
return
}
l, err := db.GetServices()
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
err = json.NewEncoder(w).Encode(l)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
}
func PutServices(w http.ResponseWriter, r *http.Request) {
var obj = map[string]string{}
decoder := json.NewDecoder(r.Body)
err := decoder.Decode(&obj)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
s, err := db.UpdateService(obj["name"], obj["addr"])
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
sendNotification(&Notification{Msg: "Update Service"})
err = json.NewEncoder(w).Encode(s)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
}
// func GetServiceList(w http.ResponseWriter, r *http.Request) {
// result := containermgmt.GetContainers(context.Background())
// b := result.Value(containermgmt.ReturnKey).([]byte)
// w.Write(b)
// }
func PostService(w http.ResponseWriter, r *http.Request) {
var obj = map[string]string{}
decoder := json.NewDecoder(r.Body)
err := decoder.Decode(&obj)
db.IsExistService(obj["name"])
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
err = containermgmt.CreateContainer(obj["name"])
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
w.WriteHeader(http.StatusCreated)
}
func RouteRequestToService(w http.ResponseWriter, r *http.Request) {
l := len([]rune("/services/"))
var id string
var url string
idx := strings.Index(r.RequestURI[l:], "/")
if idx == -1 {
id = r.RequestURI[l:]
url = "/"
} else {
id = r.RequestURI[l : l+idx]
if len([]rune(r.RequestURI)) <= l+idx {
url = "/"
} else {
url = r.RequestURI[l+idx:]
}
}
// w.Write([]byte(vars["url"]))
host, err := db.GetAddr(id)
if err != nil {
w.WriteHeader(http.StatusNotFound)
return
}
log.Println("Route to", host+url)
req, err := http.NewRequest(r.Method, "http://"+host+url, r.Body)
if err != nil {
// 잘못된 메시지 포맷이 전달된 경우
w.WriteHeader(http.StatusBadRequest)
fmt.Fprint(w, err)
return
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
// 잘못된 메시지 포맷이 전달된 경우
w.WriteHeader(http.StatusBadRequest)
fmt.Fprint(w, err)
return
}
io.Copy(w, resp.Body)
}

5
watcher/cli.go Normal file
View File

@ -0,0 +1,5 @@
package watcher
func NewStateChangedEvent(t interface{}, param map[string]interface{}) *StatusChangedEvent {
return &StatusChangedEvent{t: t, param: param}
}

32
watcher/device_watcher.go Normal file
View File

@ -0,0 +1,32 @@
package watcher
type DeviceWatcher struct {
// did string
subscriber []chan map[string]interface{}
}
func (dw *DeviceWatcher) Notify(e Event) {
dw.onChanged(e)
}
func (dw *DeviceWatcher) onChanged(e Event) {
for _, s := range dw.subscriber {
s <- e.Param()
}
}
func (dw *DeviceWatcher) Subscribe(s chan map[string]interface{}) {
dw.subscriber = append(dw.subscriber, s)
}
func (dw *DeviceWatcher) Desubscribe(s chan map[string]interface{}) int {
length := len(dw.subscriber)
for i, e := range dw.subscriber {
if s == e {
dw.subscriber[i] = dw.subscriber[length-1]
dw.subscriber = dw.subscriber[:length-1]
return length - 1
}
}
return length
}

13
watcher/interface.go Normal file
View File

@ -0,0 +1,13 @@
package watcher
type Watcher interface {
}
type Event interface {
Type() interface{}
Param() map[string]interface{}
}
type EventListener interface {
Handle(e Event)
}

44
watcher/status_changed.go Normal file
View File

@ -0,0 +1,44 @@
package watcher
type StatusChangedEvent struct {
Event
t interface{}
param map[string]interface{}
}
func (e *StatusChangedEvent) Type() interface{} {
return e.t
}
func (e *StatusChangedEvent) Param() map[string]interface{} {
return e.param
}
// type StatusChangedListener struct {
// h func(e Event)
// }
// func (l *StatusChangedListener) Handle(e Event) {
// if l.h != nil {
// l.h(e)
// }
// }
// func (l *StatusChangedListener) AddSubscriber(s chan map[string]interface{}) {
// l.subscriber = append(l.subscriber, s)
// }
// func (l *StatusChangedListener) RemoveSubscriber(s chan map[string]interface{}) int {
// length := len(l.subscriber)
// for i, e := range l.subscriber {
// if s == e {
// l.subscriber[i] = l.subscriber[length-1]
// l.subscriber = l.subscriber[:length-1]
// return length - 1
// }
// }
// return length
// }
// func (l *StatusChangedListener) onClose() {}