Bladeren bron

init commit

master
seunghwan 4 jaren geleden
commit
81ddb9ba32
78 gewijzigde bestanden met toevoegingen van 365394 en 0 verwijderingen
  1. BIN
      .DS_Store
  2. 8
    0
      .idea/.gitignore
  3. 9
    0
      .idea/Moscato_Messaging_Middleware.iml
  4. 8
    0
      .idea/modules.xml
  5. 41
    0
      .idea/vcs.xml
  6. 7
    0
      README.md
  7. BIN
      bin/readme
  8. BIN
      broker
  9. BIN
      pkg/darwin_amd64/github.com/facebookgo/inject.a
  10. BIN
      pkg/darwin_amd64/github.com/facebookgo/inject/injecttesta.a
  11. BIN
      pkg/darwin_amd64/github.com/facebookgo/inject/injecttestb.a
  12. BIN
      pkg/darwin_amd64/github.com/juliangruber/go-intersect.a
  13. BIN
      pkg/darwin_amd64/go.uber.org/zap.a
  14. BIN
      pkg/darwin_amd64/go.uber.org/zap/benchmarks.a
  15. BIN
      pkg/darwin_amd64/go.uber.org/zap/buffer.a
  16. BIN
      pkg/darwin_amd64/go.uber.org/zap/internal/bufferpool.a
  17. BIN
      pkg/darwin_amd64/go.uber.org/zap/internal/color.a
  18. BIN
      pkg/darwin_amd64/go.uber.org/zap/internal/exit.a
  19. BIN
      pkg/darwin_amd64/go.uber.org/zap/internal/ztest.a
  20. BIN
      pkg/darwin_amd64/go.uber.org/zap/zapcore.a
  21. BIN
      pkg/darwin_amd64/go.uber.org/zap/zapgrpc.a
  22. BIN
      pkg/darwin_amd64/go.uber.org/zap/zapgrpc/internal/test.a
  23. BIN
      pkg/darwin_amd64/go.uber.org/zap/zapio.a
  24. BIN
      pkg/darwin_amd64/go.uber.org/zap/zaptest.a
  25. BIN
      pkg/darwin_amd64/go.uber.org/zap/zaptest/observer.a
  26. 1
    0
      send2MsTime.log
  27. BIN
      src/.DS_Store
  28. BIN
      src/broker/.DS_Store
  29. 8
    0
      src/broker/main.go
  30. 30
    0
      src/broker/modules/AppConfig.go
  31. 74
    0
      src/broker/modules/image_matching.go
  32. 378
    0
      src/broker/modules/init.go
  33. 292
    0
      src/broker/modules/list.go
  34. 174
    0
      src/broker/modules/list_test.go
  35. 40
    0
      src/broker/modules/logConfig.go
  36. 87
    0
      src/broker/modules/manage.go
  37. 170
    0
      src/broker/modules/matching.go
  38. 170
    0
      src/broker/modules/matching_test.go
  39. 152
    0
      src/broker/modules/message.go
  40. 55
    0
      src/broker/modules/queue.go
  41. 102
    0
      src/broker/modules/secure.go
  42. 139
    0
      src/broker/modules/secure_test.go
  43. 301
    0
      src/broker/modules/subscription.go
  44. 289
    0
      src/broker/modules/subscription_test.go
  45. 1
    0
      src/github.com/apex/log
  46. 1
    0
      src/github.com/benbjohnson/clock
  47. 1
    0
      src/github.com/bmizerany/assert
  48. 1
    0
      src/github.com/davecgh/go-spew
  49. 1
    0
      src/github.com/facebookgo/ensure
  50. 1
    0
      src/github.com/facebookgo/inject
  51. 1
    0
      src/github.com/facebookgo/stack
  52. 1
    0
      src/github.com/facebookgo/structtag
  53. 1
    0
      src/github.com/facebookgo/subset
  54. 1
    0
      src/github.com/go-kit/kit
  55. 1
    0
      src/github.com/go-kit/log
  56. 1
    0
      src/github.com/go-logfmt/logfmt
  57. 1
    0
      src/github.com/go-stack/stack
  58. 1
    0
      src/github.com/juliangruber/go-intersect
  59. 1
    0
      src/github.com/kr/pretty
  60. 1
    0
      src/github.com/kr/text
  61. 1
    0
      src/github.com/mattn/go-colorable
  62. 1
    0
      src/github.com/mattn/go-isatty
  63. 1
    0
      src/github.com/pkg/errors
  64. 1
    0
      src/github.com/pmezard/go-difflib
  65. 1
    0
      src/github.com/rogpeppe/go-internal
  66. 1
    0
      src/github.com/rs/zerolog
  67. 1
    0
      src/github.com/sirupsen/logrus
  68. 1
    0
      src/github.com/stretchr/testify
  69. 1
    0
      src/go.uber.org/atomic
  70. 1
    0
      src/go.uber.org/goleak
  71. 1
    0
      src/go.uber.org/multierr
  72. 1
    0
      src/go.uber.org/zap
  73. 1
    0
      src/golang.org/x/sys
  74. 1
    0
      src/google.golang.org/grpc
  75. 1
    0
      src/gopkg.in/inconshreveable/log15.v2
  76. 1
    0
      src/gopkg.in/yaml.v2
  77. 1
    0
      src/gopkg.in/yaml.v3
  78. 362826
    0
      test.log

BIN
.DS_Store Bestand weergeven


+ 8
- 0
.idea/.gitignore Bestand weergeven

@@ -0,0 +1,8 @@
1
+# Default ignored files
2
+/shelf/
3
+/workspace.xml
4
+# Datasource local storage ignored files
5
+/dataSources/
6
+/dataSources.local.xml
7
+# Editor-based HTTP Client requests
8
+/httpRequests/

+ 9
- 0
.idea/Moscato_Messaging_Middleware.iml Bestand weergeven

@@ -0,0 +1,9 @@
1
+<?xml version="1.0" encoding="UTF-8"?>
2
+<module type="WEB_MODULE" version="4">
3
+  <component name="Go" enabled="true" />
4
+  <component name="NewModuleRootManager">
5
+    <content url="file://$MODULE_DIR$" />
6
+    <orderEntry type="inheritedJdk" />
7
+    <orderEntry type="sourceFolder" forTests="false" />
8
+  </component>
9
+</module>

+ 8
- 0
.idea/modules.xml Bestand weergeven

@@ -0,0 +1,8 @@
1
+<?xml version="1.0" encoding="UTF-8"?>
2
+<project version="4">
3
+  <component name="ProjectModuleManager">
4
+    <modules>
5
+      <module fileurl="file://$PROJECT_DIR$/.idea/Moscato_Messaging_Middleware.iml" filepath="$PROJECT_DIR$/.idea/Moscato_Messaging_Middleware.iml" />
6
+    </modules>
7
+  </component>
8
+</project>

+ 41
- 0
.idea/vcs.xml Bestand weergeven

@@ -0,0 +1,41 @@
1
+<?xml version="1.0" encoding="UTF-8"?>
2
+<project version="4">
3
+  <component name="VcsDirectoryMappings">
4
+    <mapping directory="$PROJECT_DIR$" vcs="Git" />
5
+    <mapping directory="$PROJECT_DIR$/src/github.com/apex/log" vcs="Git" />
6
+    <mapping directory="$PROJECT_DIR$/src/github.com/benbjohnson/clock" vcs="Git" />
7
+    <mapping directory="$PROJECT_DIR$/src/github.com/bmizerany/assert" vcs="Git" />
8
+    <mapping directory="$PROJECT_DIR$/src/github.com/davecgh/go-spew" vcs="Git" />
9
+    <mapping directory="$PROJECT_DIR$/src/github.com/facebookgo/ensure" vcs="Git" />
10
+    <mapping directory="$PROJECT_DIR$/src/github.com/facebookgo/inject" vcs="Git" />
11
+    <mapping directory="$PROJECT_DIR$/src/github.com/facebookgo/stack" vcs="Git" />
12
+    <mapping directory="$PROJECT_DIR$/src/github.com/facebookgo/structtag" vcs="Git" />
13
+    <mapping directory="$PROJECT_DIR$/src/github.com/facebookgo/subset" vcs="Git" />
14
+    <mapping directory="$PROJECT_DIR$/src/github.com/go-kit/kit" vcs="Git" />
15
+    <mapping directory="$PROJECT_DIR$/src/github.com/go-kit/log" vcs="Git" />
16
+    <mapping directory="$PROJECT_DIR$/src/github.com/go-logfmt/logfmt" vcs="Git" />
17
+    <mapping directory="$PROJECT_DIR$/src/github.com/go-stack/stack" vcs="Git" />
18
+    <mapping directory="$PROJECT_DIR$/src/github.com/jeanphorn/log4go" vcs="Git" />
19
+    <mapping directory="$PROJECT_DIR$/src/github.com/juliangruber/go-intersect" vcs="Git" />
20
+    <mapping directory="$PROJECT_DIR$/src/github.com/kr/pretty" vcs="Git" />
21
+    <mapping directory="$PROJECT_DIR$/src/github.com/kr/text" vcs="Git" />
22
+    <mapping directory="$PROJECT_DIR$/src/github.com/mattn/go-colorable" vcs="Git" />
23
+    <mapping directory="$PROJECT_DIR$/src/github.com/mattn/go-isatty" vcs="Git" />
24
+    <mapping directory="$PROJECT_DIR$/src/github.com/pkg/errors" vcs="Git" />
25
+    <mapping directory="$PROJECT_DIR$/src/github.com/pmezard/go-difflib" vcs="Git" />
26
+    <mapping directory="$PROJECT_DIR$/src/github.com/rogpeppe/go-internal" vcs="Git" />
27
+    <mapping directory="$PROJECT_DIR$/src/github.com/rs/zerolog" vcs="Git" />
28
+    <mapping directory="$PROJECT_DIR$/src/github.com/sirupsen/logrus" vcs="Git" />
29
+    <mapping directory="$PROJECT_DIR$/src/github.com/stretchr/testify" vcs="Git" />
30
+    <mapping directory="$PROJECT_DIR$/src/github.com/toolkits/file" vcs="Git" />
31
+    <mapping directory="$PROJECT_DIR$/src/go.uber.org/atomic" vcs="Git" />
32
+    <mapping directory="$PROJECT_DIR$/src/go.uber.org/goleak" vcs="Git" />
33
+    <mapping directory="$PROJECT_DIR$/src/go.uber.org/multierr" vcs="Git" />
34
+    <mapping directory="$PROJECT_DIR$/src/go.uber.org/zap" vcs="Git" />
35
+    <mapping directory="$PROJECT_DIR$/src/golang.org/x/sys" vcs="Git" />
36
+    <mapping directory="$PROJECT_DIR$/src/google.golang.org/grpc" vcs="Git" />
37
+    <mapping directory="$PROJECT_DIR$/src/gopkg.in/inconshreveable/log15.v2" vcs="Git" />
38
+    <mapping directory="$PROJECT_DIR$/src/gopkg.in/yaml.v2" vcs="Git" />
39
+    <mapping directory="$PROJECT_DIR$/src/gopkg.in/yaml.v3" vcs="Git" />
40
+  </component>
41
+</project>

+ 7
- 0
README.md Bestand weergeven

@@ -0,0 +1,7 @@
1
+# Moscato_Messaging_Middleware
2
+
3
+Project Moscato
4
+Team Messaging Middleware
5
+
6
+Implemetation Message Middleware by Golang
7
+Operate as Secure, Effectively  

BIN
bin/readme Bestand weergeven


BIN
broker Bestand weergeven


BIN
pkg/darwin_amd64/github.com/facebookgo/inject.a Bestand weergeven


BIN
pkg/darwin_amd64/github.com/facebookgo/inject/injecttesta.a Bestand weergeven


BIN
pkg/darwin_amd64/github.com/facebookgo/inject/injecttestb.a Bestand weergeven


BIN
pkg/darwin_amd64/github.com/juliangruber/go-intersect.a Bestand weergeven


BIN
pkg/darwin_amd64/go.uber.org/zap.a Bestand weergeven


BIN
pkg/darwin_amd64/go.uber.org/zap/benchmarks.a Bestand weergeven


BIN
pkg/darwin_amd64/go.uber.org/zap/buffer.a Bestand weergeven


BIN
pkg/darwin_amd64/go.uber.org/zap/internal/bufferpool.a Bestand weergeven


BIN
pkg/darwin_amd64/go.uber.org/zap/internal/color.a Bestand weergeven


BIN
pkg/darwin_amd64/go.uber.org/zap/internal/exit.a Bestand weergeven


BIN
pkg/darwin_amd64/go.uber.org/zap/internal/ztest.a Bestand weergeven


BIN
pkg/darwin_amd64/go.uber.org/zap/zapcore.a Bestand weergeven


BIN
pkg/darwin_amd64/go.uber.org/zap/zapgrpc.a Bestand weergeven


BIN
pkg/darwin_amd64/go.uber.org/zap/zapgrpc/internal/test.a Bestand weergeven


BIN
pkg/darwin_amd64/go.uber.org/zap/zapio.a Bestand weergeven


BIN
pkg/darwin_amd64/go.uber.org/zap/zaptest.a Bestand weergeven


BIN
pkg/darwin_amd64/go.uber.org/zap/zaptest/observer.a Bestand weergeven


+ 1
- 0
send2MsTime.log Bestand weergeven

@@ -0,0 +1 @@
1
+28500

BIN
src/.DS_Store Bestand weergeven


BIN
src/broker/.DS_Store Bestand weergeven


+ 8
- 0
src/broker/main.go Bestand weergeven

@@ -0,0 +1,8 @@
1
+package main
2
+
3
+import "broker/modules"
4
+
5
+func main() {
6
+	moscato := modules.Moscato{}
7
+	moscato.Run()
8
+}

+ 30
- 0
src/broker/modules/AppConfig.go Bestand weergeven

@@ -0,0 +1,30 @@
1
+package modules
2
+
3
+import (
4
+	"github.com/facebookgo/inject"
5
+)
6
+
7
+type AppConfig struct {
8
+	application *Moscato
9
+}
10
+
11
+func (app *AppConfig) config() *Moscato {
12
+	var graph inject.Graph
13
+
14
+	err := graph.Provide(
15
+		&inject.Object{Value: NewMStable()},
16
+		&inject.Object{Value: app.application},
17
+		&inject.Object{Value: NewSecurity()})
18
+
19
+	if err != nil {
20
+		println(err)
21
+		return nil
22
+	}
23
+
24
+	err = graph.Populate()
25
+	if err != nil {
26
+		println(err)
27
+		return nil
28
+	}
29
+	return app.application
30
+}

+ 74
- 0
src/broker/modules/image_matching.go Bestand weergeven

@@ -0,0 +1,74 @@
1
+package modules
2
+
3
+import (
4
+	"errors"
5
+	"fmt"
6
+	"math"
7
+)
8
+
9
+func Cosine(a []float64, b []float64) (cosine float64, err error) {
10
+	count := 0
11
+	length_a := len(a)
12
+	length_b := len(b)
13
+	if length_a > length_b {
14
+		count = length_a
15
+	} else {
16
+		count = length_b
17
+	}
18
+	sumA := 0.0
19
+	s1 := 0.0
20
+	s2 := 0.0
21
+	for k := 0; k < count; k++ {
22
+		if k >= length_a {
23
+			s2 += math.Pow(b[k], 2)
24
+			continue
25
+		}
26
+		if k >= length_b {
27
+			s1 += math.Pow(a[k], 2)
28
+			continue
29
+		}
30
+		sumA += a[k] * b[k]
31
+		s1 += math.Pow(a[k], 2)
32
+		s2 += math.Pow(b[k], 2)
33
+	}
34
+	if s1 == 0 || s2 == 0 {
35
+		return 0.0, errors.New("Vectors should not be null (all zeros)")
36
+	}
37
+	return sumA / (math.Sqrt(s1) * math.Sqrt(s2)), nil
38
+}
39
+
40
+func (moscato *Moscato) ImageMatching(msg MsgUnit) {
41
+
42
+	fmt.Println("gogoman")
43
+	topic := msg.(PublishedImage).Topic
44
+	sub_mng := moscato.SubscriptionManager
45
+
46
+	// iplist for return
47
+	//ipList := make([]string, 0)
48
+
49
+	topicPtr := sub_mng.list
50
+	ptr := topicPtr.getTopicBySimilarity(topic)
51
+
52
+	// No topicNode
53
+	if len(ptr) == 0 {
54
+		fmt.Println("error")
55
+		moscato.SendQueue <- myType{nil, msg, errors.New("No Exist Matching Topic")}
56
+	} else {
57
+
58
+		//find exact topic match
59
+		fmt.Println("here1")
60
+		valPtr := ptr[0]
61
+		fmt.Println(valPtr.topic)
62
+		iplist := make([]string, 0)
63
+		for i := 0; i < topicPtr.size; i++ {
64
+			pnt := sub_mng.subnum2conv[i]
65
+			compare := ExactCompare(valPtr.topic, pnt)
66
+			if compare == 1 {
67
+				ipAddr := sub_mng.sub2ip[i]
68
+				iplist = append(iplist, ipAddr)
69
+			}
70
+		}
71
+		moscato.MatchingManager.match_count++
72
+		moscato.SendQueue <- myType{iplist, msg, nil}
73
+	}
74
+}

+ 378
- 0
src/broker/modules/init.go Bestand weergeven

@@ -0,0 +1,378 @@
1
+package modules
2
+
3
+import (
4
+	"encoding/json"
5
+	"errors"
6
+	"fmt"
7
+	"net"
8
+	"net/rpc"
9
+	"os"
10
+	"os/signal"
11
+	"strconv"
12
+	"syscall"
13
+	"time"
14
+)
15
+
16
+type Moscato struct {
17
+	queue               MsgQueue
18
+	SendQueue           chan myType
19
+	MicroServiceManager NodeManager `inject:""`
20
+	MatchingManager     match_manager
21
+	SubscriptionManager sub_manager
22
+	SecureManager       SecurityManager `inject:""`
23
+}
24
+
25
+type myType struct {
26
+	subList []string
27
+	pubMsg  MsgUnit
28
+	err     error
29
+}
30
+
31
+type Reply struct { //RPC리턴값
32
+	CompleteLog string //제대로 받았는지 확인하는 로그
33
+}
34
+
35
+type Receiver struct { //RPC 서버에 등록하기 위한 변수
36
+	moscato *Moscato
37
+}
38
+
39
+type Args struct { // 매개변수
40
+	JsonMsg []byte
41
+	Kind    int
42
+}
43
+
44
+/*
45
+MS→MM
46
+
47
+-MM 실행되면 MM서버는 열려있음(MS은 자동으로 Client)(포트 8150)
48
+
49
+-Send2MM 호출 → rpc call로 MmReceive호출해서 MM로 전달(json형식)
50
+
51
+-MmReceive에서 msgType 검사 후 그것에 맞게 msgUnit으로 MM의 Receive로 보냄
52
+
53
+-MM의 Receive에서 해당 Message를 처리
54
+*/
55
+
56
+func (receiver Receiver) MmReceive(args Args, reply *Reply) error { //
57
+	// 메세지 별로 나눠서 언마샬
58
+	switch args.Kind {
59
+
60
+	case KSM:
61
+		var msg KeyShareMsg
62
+		err := json.Unmarshal(args.JsonMsg, &msg)
63
+		if err != nil {
64
+			return err
65
+		}
66
+		go func() {
67
+			_, err := receiver.moscato.Receive(msg)
68
+			if err != nil {
69
+
70
+			}
71
+		}()
72
+		reply.CompleteLog = "KSM received"
73
+	case PM:
74
+		var msg PublishedImage
75
+		//fmt.Println(args.JsonMsg)
76
+		err := json.Unmarshal(args.JsonMsg, &msg)
77
+		//fmt.Println(msg)
78
+		if err != nil {
79
+			return err
80
+		}
81
+
82
+		go func() {
83
+			_, err := receiver.moscato.Receive(msg)
84
+			if err != nil {
85
+
86
+			}
87
+		}()
88
+		reply.CompleteLog = "PM received"
89
+	case SM:
90
+		var msg SubscriptionImage
91
+		err := json.Unmarshal(args.JsonMsg, &msg)
92
+		if err != nil {
93
+			return err
94
+		}
95
+		go func() {
96
+			_, err := receiver.moscato.Receive(msg)
97
+			if err != nil {
98
+
99
+			}
100
+		}()
101
+		reply.CompleteLog = "SM received"
102
+	case RM:
103
+		var msg RegisterMsg
104
+		err := json.Unmarshal(args.JsonMsg, &msg)
105
+		if err != nil {
106
+			return err
107
+		}
108
+		go func() {
109
+			_, err := receiver.moscato.Receive(msg)
110
+			if err != nil {
111
+				fmt.Println(err)
112
+			}
113
+		}()
114
+		reply.CompleteLog = "RM received"
115
+	case WM:
116
+		var msg WithdrawMsg
117
+		err := json.Unmarshal(args.JsonMsg, &msg)
118
+		if err != nil {
119
+			return err
120
+		}
121
+		go func() {
122
+			_, err := receiver.moscato.Receive(msg)
123
+			if err != nil {
124
+
125
+			}
126
+		}()
127
+		reply.CompleteLog = "WM received"
128
+	default:
129
+		return errors.New("message type Error: Not registered message type")
130
+	}
131
+	//reply.CompleteLog = "received completely"
132
+	return nil
133
+}
134
+
135
+//Recieve - MM가 MS로부터 메세지 전달받음
136
+func (moscato *Moscato) Receive(msg MsgUnit) (MsgUnit, error) {
137
+	logger := NewMyLogger()
138
+	defer logger.Sync()
139
+
140
+	//rpc call
141
+	var msg_type = msg.CheckType()
142
+	//메세지 타입에 따라 다르게 처리
143
+	switch msg_type {
144
+
145
+	case KSM: //Key share msg
146
+
147
+	case PM: //Publish msg
148
+		//log.Println("PM received")
149
+		fromNodeName, _ := moscato.MicroServiceManager.GetIpaddr(msg.(PublishedImage).From)
150
+		logger.Info("PM received from:[" + fromNodeName + "]")
151
+		moscato.queue.push(moscato.preProcessMsg(msg))
152
+
153
+	case SM: //Subscription msg
154
+		//log.Println("SM received")
155
+		fromNodeName, _ := moscato.MicroServiceManager.GetIpaddr(msg.(SubscriptionImage).From)
156
+		logger.Info("SM received from:[" + fromNodeName + "]")
157
+		err := moscato.SubscriptionManager.addSubscription(moscato.preProcessMsg(msg))
158
+		if err != nil {
159
+			logger.Warn(err.Error())
160
+			//return nil, err
161
+		}
162
+
163
+	case RM: //Register msg
164
+		var newMsg RegisterMsg
165
+		newMsg = msg.(RegisterMsg)
166
+		logger.Info("RM received from:[" + newMsg.From + "]")
167
+
168
+		newNode := MSNode{newMsg.From, newMsg.From}
169
+		resultAddNode := moscato.MicroServiceManager.AddMicroservice(newNode)
170
+		if resultAddNode {
171
+			logger.Info("Node added successful")
172
+		} else {
173
+			logger.Error("Node is already added, ignore RM")
174
+			//log.Println("Node is already added, ignore RM")
175
+			return msg, nil
176
+		}
177
+
178
+		addr, _ := moscato.MicroServiceManager.GetIpaddr(newMsg.From)
179
+
180
+		moscato.SecureManager.RegKey(newMsg)
181
+		logger.Debug("Registered microservice: address " + addr +
182
+			" / key " + strconv.FormatUint(uint64(moscato.SecureManager.GetNodeKey(newMsg.From)), 10))
183
+
184
+		// ackRM 메세지 전송
185
+		go moscato.Send2MS(addr, newMsg)
186
+
187
+	case WM: //Withdraw msg
188
+		fromNodeName, _ := moscato.MicroServiceManager.GetIpaddr(msg.(WithdrawMsg).From)
189
+		logger.Info("WM received from:[" + fromNodeName + "]")
190
+		//ip := msg.(WithdrawMsg).From
191
+		//sublist := moscato.SubscriptionManager.ip2sub[ip]
192
+		//fmt.Println("prev list = ", sublist)
193
+		moscato.MicroServiceManager.RemoveMicroservice(msg.(WithdrawMsg).From)
194
+		moscato.SecureManager.RemoveSecureKey(msg.(WithdrawMsg).From)
195
+		//moscato.SubscriptionManager.delete(ip)
196
+		//sublist2 := moscato.SubscriptionManager.ip2sub[ip]
197
+		//fmt.Println("after list =", sublist2)
198
+
199
+	default:
200
+		return nil, errors.New("Message type Error: Not registered message type")
201
+	}
202
+
203
+	return msg, nil
204
+}
205
+
206
+//MS로 보낼때 쓸 함수
207
+
208
+/*
209
+MM→MS
210
+
211
+-MS 실행되면 MS서버는 열려있음(MM은 자동으로 Client)(포트 8160)
212
+
213
+-Send2MS 호출 → rpc call로 MsReceive호출해서 MS로 전달(json형식)
214
+
215
+-MsReceive에서 msgType 검사 후 그것에 맞게 msgUnit으로 MS의 Receive로 보냄
216
+
217
+-MS의 Receive에서 해당 Message를 처리
218
+*/
219
+
220
+func (moscato *Moscato) Send2MS(ipaddress string, msg MsgUnit) {
221
+	logger := NewMyLogger()
222
+	defer logger.Sync()
223
+
224
+	client, err := rpc.Dial("tcp", ipaddress+":8150")
225
+	if err != nil {
226
+		fmt.Println(err)
227
+		return
228
+	}
229
+	defer client.Close()
230
+
231
+	reply := new(Reply)
232
+	jmsg, _ := msg.ConvertToJson()
233
+	args := Args{
234
+		JsonMsg: jmsg,
235
+		Kind:    msg.CheckType(),
236
+	}
237
+	err = client.Call("Receiver.Receive", args, reply)
238
+	if err != nil {
239
+		fmt.Println(err)
240
+		return
241
+	}
242
+	//log.Println(reply.CompleteLog) //잘 받았는지 확인 해줌
243
+	// 마이크로 서비스에게 받은 메시지는 노란색으로 출력
244
+	//log.Println(reply.CompleteLog)
245
+	logger.Debug("HERE" + reply.CompleteLog)
246
+}
247
+
248
+//Matching을 용이하게 하기위한 메세지 가공 과정
249
+func (moscato *Moscato) preProcessMsg(originalMsg MsgUnit) MsgUnit {
250
+	if originalMsg.CheckType() == PM {
251
+		pubMsg := originalMsg.(PublishedImage)
252
+		//for index := 0; index < len(pubMsg.Topic); index++ {
253
+		//	pubMsg.Topic[index] = pubMsg.Topic[index] - moscato.SecureManager.GetNodeKey(pubMsg.From)
254
+		//}
255
+		//for index := 0; index < len(pubMsg.Value); index++ {
256
+		//	pubMsg.Value[index] = pubMsg.Value[index] - moscato.SecureManager.GetNodeKey(pubMsg.From)
257
+		//}
258
+		return pubMsg
259
+	} else if originalMsg.CheckType() == SM {
260
+		subMsg := originalMsg.(SubscriptionImage)
261
+		//for index := 0; index < len(subMsg.Topic); index++ {
262
+		//	subMsg.Topic[index] = subMsg.Topic[index] - moscato.SecureManager.GetNodeKey(subMsg.From)
263
+		//}
264
+		//for index := 0; index < len(subMsg.Value); index++ {
265
+		//	subMsg.Value[index] = subMsg.Value[index] - moscato.SecureManager.GetNodeKey(subMsg.From)
266
+		//}
267
+		return subMsg
268
+	}
269
+	return nil
270
+}
271
+
272
+//암호화 해서 보내기
273
+func (moscato *Moscato) SendWithEncrypt() MsgUnit {
274
+	for {
275
+		mt := <-moscato.SendQueue
276
+		if mt.err == nil {
277
+			for index := 0; index < len(mt.subList); index++ { //sublist들을 돌면서 매세지를 encrypt하여 메세지 보냄
278
+				tmpNode := mt.subList[index]
279
+				tmpNodeIpAddr, _ := moscato.MicroServiceManager.GetIpaddr(tmpNode)
280
+				//moscato.SecureManager.ReEncPubMsg(mt.pubMsg.(PublishMsg), tmpNode)
281
+				//fmt.Println("publish: ", mt.pubMsg)
282
+				//moscato.Send2MS(tmpNodeIpAddr, mt.pubMsg)
283
+				//moscato.Send2MS(tmpNodeIpAddr, moscato.SecureManager.ReEncPubMsg(mt.pubMsg.(PublishMsg), tmpNode))
284
+				fmt.Println(mt.pubMsg)
285
+				moscato.Send2MS(tmpNodeIpAddr, mt.pubMsg.(PublishedImage))
286
+			}
287
+
288
+		}
289
+		return nil
290
+	}
291
+}
292
+
293
+func (moscato *Moscato) Run() {
294
+	logger := NewMyLogger()
295
+	defer logger.Sync()
296
+
297
+	config := AppConfig{moscato}
298
+	config.config()
299
+
300
+	sigs := make(chan os.Signal, 1)
301
+	done := make(chan bool, 1)
302
+
303
+	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
304
+
305
+	go func() {
306
+		sig := <-sigs
307
+		//withDraw(message, client)
308
+		//fmt.Println(sig)
309
+		_ = sig
310
+		done <- true
311
+		logger.Info("terminate Moscato Message Middleware")
312
+		os.Exit(0)
313
+	}()
314
+
315
+	//모스카토 구조체 변수 초기화
316
+	receiver := Receiver{moscato: moscato}
317
+	err := moscato.queue.queue_init()
318
+	moscato.SendQueue = make(chan myType)
319
+	moscato.SubscriptionManager.Initialize()
320
+
321
+	if err != nil {
322
+		fmt.Println(err)
323
+		return
324
+	}
325
+
326
+	send2MSFile, _ := os.Create("./send2MsTime.log")
327
+	defer send2MSFile.Close()
328
+	//go routine -> matching 동작
329
+	go func() {
330
+		for {
331
+			msg := moscato.queue.pop(true)
332
+			fmt.Println("gere")
333
+			fmt.Println(msg) // poped msg 확인
334
+			encStartTime := time.Now()
335
+			go moscato.ImageMatching(msg)
336
+			go moscato.SendWithEncrypt()
337
+			encElapsedTime := time.Since(encStartTime)
338
+			fmt.Printf("MM enc시간: %d\n\n", encElapsedTime.Nanoseconds())
339
+			fmt.Fprintln(send2MSFile, encElapsedTime.Nanoseconds())
340
+		}
341
+	}()
342
+
343
+	//go moscato.CheckQueue()
344
+
345
+	//rpc 등록 -> Receive 함수
346
+	err = rpc.Register(receiver)
347
+	if err != nil {
348
+		println(err)
349
+		return
350
+	}
351
+
352
+	go Listen()
353
+	logger.Info("initializing complete")
354
+
355
+	<-done
356
+}
357
+
358
+func Listen() {
359
+	logger := NewMyLogger()
360
+	defer logger.Sync()
361
+	/*
362
+		MS→MM일때 ⇒ port : 8160으로 열기
363
+
364
+		(MM이 Server, MS가 Client)
365
+	*/
366
+
367
+	l, err1 := net.Listen("tcp", ":8160")
368
+
369
+	if err1 != nil {
370
+		logger.Fatal("Unable to listen on given port: " + err1.Error())
371
+	}
372
+	defer l.Close()
373
+
374
+	for {
375
+		conn, _ := l.Accept()
376
+		go rpc.ServeConn(conn)
377
+	}
378
+}

+ 292
- 0
src/broker/modules/list.go Bestand weergeven

@@ -0,0 +1,292 @@
1
+package modules
2
+
3
+/* List to manage Sub information */
4
+
5
+type topicList struct {
6
+	head *topicNode
7
+	tail *topicNode
8
+	size int
9
+}
10
+
11
+type topicNode struct {
12
+	topic []float64 // Encrypted topic
13
+	next  *topicNode
14
+	prev  *topicNode
15
+	list  valueList
16
+}
17
+
18
+type valueList struct {
19
+	head *valueNode
20
+	tail *valueNode
21
+	size int
22
+}
23
+
24
+type valueNode struct {
25
+	val  []int64 // Encrypted value
26
+	next *valueNode
27
+	prev *valueNode
28
+
29
+	// single //
30
+	single2sub_s  []int // (sub_val < pub_val) sub#s
31
+	single2sub_es []int // (sub_val <= pub_val) sub#s
32
+	single2sub_b  []int // (sub_val > pub_val) sub#s
33
+	single2sub_eb []int // (sub_val >= pub_val) sub#s
34
+	single2sub_e  []int // (sub_val == pub_val) sub#s
35
+
36
+	// range //
37
+	range2sub_s  []int // (sub_val < pub_val) sub#s
38
+	range2sub_es []int // (sub_val <= pub_val) sub#s
39
+	range2sub_b  []int // (sub_val > pub_val) sub#s
40
+	range2sub_eb []int // (sub_val >= pub_val) sub#s
41
+}
42
+
43
+// To delete element in slice Array
44
+func remove(ary []int, i int) []int {
45
+	return append(ary[:i], ary[i+1:]...)
46
+}
47
+
48
+// To find a specific sub# in a Value node
49
+func findSub(ary []int, sub int) int {
50
+	for i := 0; i < len(ary); i++ {
51
+		if ary[i] == sub {
52
+			return i
53
+		}
54
+	}
55
+	return -1
56
+}
57
+
58
+func ExactCompare(a []float64, b []float64) int {
59
+	if len(a) != len(b) {
60
+		return 0
61
+	} else {
62
+		for i := 0; i < len(a); i++ {
63
+			if a[i] != b[i] {
64
+				return 0
65
+			}
66
+		}
67
+	}
68
+	return 1
69
+}
70
+
71
+func ConvCompare(a []float64, b []float64) int {
72
+	if len(a) < len(b) {
73
+		return 1
74
+	} else if len(a) > len(b) {
75
+		return -1
76
+	} else {
77
+		for i := 0; i < len(a); i++ {
78
+			if a[i] < b[i] {
79
+				return 1
80
+			} else if a[i] > b[i] {
81
+				return -1
82
+			}
83
+		}
84
+		return 0
85
+	}
86
+}
87
+
88
+// Compare -> To compare two encrypted arrays
89
+func Compare(a []int64, b []int64) int {
90
+	if len(a) < len(b) {
91
+		return 1
92
+	} else if len(a) > len(b) {
93
+		return -1
94
+	} else {
95
+		for i := 0; i < len(a); i++ {
96
+			if a[i] < b[i] {
97
+				return 1
98
+			} else if a[i] > b[i] {
99
+				return -1
100
+			}
101
+		}
102
+		return 0
103
+	}
104
+}
105
+
106
+func (l *topicList) getTopicBySimilarity(topic []float64) []*topicNode {
107
+	var threshold = float64(0.9)
108
+	var ptr []*topicNode
109
+	topicPtr := l.head
110
+	for topicPtr != nil {
111
+		if len(topicPtr.topic) == 0 {
112
+			topicPtr = topicPtr.next
113
+			continue
114
+		}
115
+		cosine, _ := Cosine(topicPtr.topic, topic)
116
+		if cosine > threshold {
117
+			ptr = append(ptr, topicPtr)
118
+		}
119
+		topicPtr = topicPtr.next
120
+	}
121
+	return ptr
122
+}
123
+
124
+// To get the position of a Topic node with a specific Topic
125
+func (l *topicList) getTopicNodePos(topic []float64) *topicNode {
126
+	topicPtr := l.head
127
+	for topicPtr != nil {
128
+		if len(topicPtr.topic) == 0 {
129
+			topicPtr = topicPtr.next
130
+			continue
131
+		}
132
+		if ConvCompare(topicPtr.topic, topic) == 0 {
133
+			return topicPtr
134
+		}
135
+		topicPtr = topicPtr.next
136
+	}
137
+	return nil
138
+}
139
+
140
+func (l *topicList) addConvTopic(topic []float64) {
141
+	newNode := &topicNode{topic, nil, nil, valueList{}}
142
+	if l.head == nil {
143
+		l.head = newNode
144
+		l.tail = l.head
145
+	} else {
146
+		newNode.prev = l.tail
147
+		l.tail.next = newNode
148
+		l.tail = newNode
149
+	}
150
+	l.size++
151
+}
152
+
153
+// To add a Topic node to the Topic list
154
+func (l *topicList) addTopicNode(topic []float64) {
155
+	newNode := &topicNode{topic, nil, nil, valueList{}}
156
+	if l.head == nil {
157
+		l.head = newNode
158
+		l.tail = l.head
159
+	} else {
160
+		newNode.prev = l.tail
161
+		l.tail.next = newNode
162
+		l.tail = newNode
163
+	}
164
+	l.size++
165
+}
166
+
167
+// To add a Value node to the Value list
168
+func (l *valueList) addValueNode(value []int64) {
169
+	newValNode := &valueNode{value, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil}
170
+	if l.head == nil {
171
+		l.head = newValNode
172
+		l.tail = l.head
173
+	} else {
174
+		newValNode.prev = l.tail
175
+		l.tail.next = newValNode
176
+		l.tail = newValNode
177
+	}
178
+	l.size++
179
+}
180
+
181
+// To get the position of a Value node with a specific Value
182
+func (l *valueList) getValueNodePos(value []int64) *valueNode {
183
+	valPtr := l.head
184
+	for valPtr != nil {
185
+		if len(valPtr.val) == 0 {
186
+			valPtr = valPtr.next
187
+			continue
188
+		}
189
+		if Compare(value, valPtr.val) == 0 {
190
+			return valPtr
191
+		}
192
+		valPtr = valPtr.next
193
+	}
194
+	return nil
195
+}
196
+
197
+// To insert sub# into the Operator list of the Value node
198
+func (node *valueNode) insertSub(op string, sub int, issingle bool) {
199
+	if issingle == true {
200
+		switch op {
201
+		case "<":
202
+			node.single2sub_s = append(node.single2sub_s, sub)
203
+		case "<=":
204
+			node.single2sub_es = append(node.single2sub_es, sub)
205
+		case ">":
206
+			node.single2sub_b = append(node.single2sub_b, sub)
207
+		case ">=":
208
+			node.single2sub_eb = append(node.single2sub_eb, sub)
209
+		case "==":
210
+			node.single2sub_e = append(node.single2sub_e, sub)
211
+		}
212
+	} else {
213
+		switch op {
214
+		case "<":
215
+			node.range2sub_s = append(node.range2sub_s, sub)
216
+		case "<=":
217
+			node.range2sub_es = append(node.range2sub_es, sub)
218
+		case ">":
219
+			node.range2sub_b = append(node.range2sub_b, sub)
220
+		case ">=":
221
+			node.range2sub_eb = append(node.range2sub_eb, sub)
222
+		}
223
+	}
224
+}
225
+
226
+// To check if sub# is in the Value node's operator list
227
+func (node *valueNode) findOperatorList(op string, sub int, issingle bool) int {
228
+	ret := -1
229
+	if issingle == true {
230
+		switch op {
231
+		case "<":
232
+			ret = findSub(node.single2sub_s, sub)
233
+		case "<=":
234
+			ret = findSub(node.single2sub_es, sub)
235
+		case ">":
236
+			ret = findSub(node.single2sub_b, sub)
237
+		case ">=":
238
+			ret = findSub(node.single2sub_eb, sub)
239
+		case "==":
240
+			ret = findSub(node.single2sub_e, sub)
241
+		}
242
+	} else {
243
+		switch op {
244
+		case "<":
245
+			ret = findSub(node.range2sub_s, sub)
246
+		case "<=":
247
+			ret = findSub(node.range2sub_es, sub)
248
+		case ">":
249
+			ret = findSub(node.range2sub_b, sub)
250
+		case ">=":
251
+			ret = findSub(node.range2sub_eb, sub)
252
+		}
253
+	}
254
+	if ret < 0 {
255
+		return 0
256
+	} else {
257
+		return 1
258
+	}
259
+}
260
+
261
+// To check if a specific Value node is empty
262
+func (node *valueNode) isEmpty() bool {
263
+	empty := true
264
+	if len(node.single2sub_s) != 0 {
265
+		empty = false
266
+	}
267
+	if len(node.single2sub_es) != 0 {
268
+		empty = false
269
+	}
270
+	if len(node.single2sub_b) != 0 {
271
+		empty = false
272
+	}
273
+	if len(node.single2sub_eb) != 0 {
274
+		empty = false
275
+	}
276
+	if len(node.single2sub_e) != 0 {
277
+		empty = false
278
+	}
279
+	if len(node.range2sub_s) != 0 {
280
+		empty = false
281
+	}
282
+	if len(node.range2sub_es) != 0 {
283
+		empty = false
284
+	}
285
+	if len(node.range2sub_b) != 0 {
286
+		empty = false
287
+	}
288
+	if len(node.range2sub_eb) != 0 {
289
+		empty = false
290
+	}
291
+	return empty
292
+}

+ 174
- 0
src/broker/modules/list_test.go Bestand weergeven

@@ -0,0 +1,174 @@
1
+package modules
2
+
3
+import (
4
+	_ "fmt"
5
+	"github.com/stretchr/testify/assert"
6
+	"testing"
7
+)
8
+
9
+func Test_addTopicNode(t *testing.T) {
10
+
11
+	//list := &topicList{nil, nil, 0}
12
+	//ary := []int64{123, 456, 789, 1234}
13
+	//
14
+	//for i := 0; i < 4; i++ {
15
+	//	tmp := []int64{ary[i]}
16
+	//	list.addTopicNode(tmp)
17
+	//	exp := []int64{ary[i]}
18
+	//
19
+	//	assert.Equal(t, list.tail.topic, exp, "add TopicNode failed")
20
+	//}
21
+}
22
+
23
+func Test_addValueNode(t *testing.T) {
24
+
25
+	list := &valueList{nil, nil, 0}
26
+	ary := []int64{123, 456, 789, 1234}
27
+
28
+	for i := 0; i < 4; i++ {
29
+		tmp := []int64{ary[i]}
30
+		list.addValueNode(tmp)
31
+		exp := []int64{ary[i]}
32
+
33
+		assert.Equal(t, list.tail.val, exp, "add ValueNode failed")
34
+	}
35
+}
36
+
37
+func Test_findSub(t *testing.T) {
38
+
39
+	ary := []int{1, 2, 3, 4, 5, 6}
40
+	for i := 0; i < 6; i++ {
41
+		assert.Equal(t, findSub(ary, i+1), i, "findSub is failed")
42
+	}
43
+
44
+	// Don't Exist
45
+	assert.Equal(t, findSub(ary, 7), -1, "findSub is failed")
46
+
47
+	// array = {1, 2, 3, 4, 5, 6} -> {1, 2, 4, 5, 6}
48
+	ary = remove(ary, 2)
49
+	assert.Equal(t, findSub(ary, 6), 4)
50
+}
51
+
52
+func Test_remove(t *testing.T) {
53
+	l := make([]int, 4)
54
+	var i int
55
+	for i = 0; i < 4; i++ {
56
+		l[i] = i
57
+	}
58
+
59
+	// array = {0, 1, 2, 3} -> {0, 1, 3}
60
+	l = remove(l, 2)
61
+	assert.Equal(t, l, []int{0, 1, 3}, "Array Delete is failed")
62
+
63
+	// array = {0, 1, 3} -> {1, 3}
64
+	l = remove(l, 0)
65
+	assert.Equal(t, l, []int{1, 3}, "Array Delete is failed")
66
+
67
+	// array = {1, 2} -> {1}
68
+	l = remove(l, 1)
69
+	assert.Equal(t, l, []int{1}, "Array Delete is failed")
70
+
71
+	// array = {1} -> {}
72
+	l = remove(l, 0)
73
+	assert.Equal(t, l, []int{}, "Array Delete is failed")
74
+}
75
+
76
+func Test_isempty(t *testing.T) {
77
+	node := &valueNode{next: nil, prev: nil}
78
+
79
+	// 1. One element in Node
80
+	node.single2sub_eb = append(node.single2sub_eb, 1)
81
+	assert.Equal(t, node.isEmpty(), false, "isEmpty is failed")
82
+
83
+	// 2. Empty
84
+	node.single2sub_eb = remove(node.single2sub_eb, 0)
85
+	assert.Equal(t, node.isEmpty(), true, "isEmpty is failed")
86
+
87
+	// 3. Many elements in Node
88
+	node.single2sub_b = append(node.single2sub_b, 1)
89
+	node.single2sub_eb = append(node.single2sub_eb, 1)
90
+	node.single2sub_s = append(node.single2sub_s, 1)
91
+	node.single2sub_es = append(node.single2sub_es, 1)
92
+	node.single2sub_e = append(node.single2sub_e, 1)
93
+	assert.Equal(t, node.isEmpty(), false, "isEmpty is failed")
94
+}
95
+
96
+func Test_insertSub(t *testing.T) {
97
+	node := &valueNode{next: nil, prev: nil}
98
+
99
+	// 1. Single2sub
100
+
101
+	// (1) <
102
+	node.insertSub("<", 1, true)
103
+	assert.Equal(t, 0, findSub(node.single2sub_s, 1), "singleInsertSub (<) is failed")
104
+
105
+	// (2) <=
106
+	node.insertSub("<=", 2, true)
107
+	assert.Equal(t, 0, findSub(node.single2sub_es, 2), "singleInsertSub (<=) is failed")
108
+
109
+	// (3) >
110
+	node.insertSub(">", 3, true)
111
+	assert.Equal(t, 0, findSub(node.single2sub_b, 3), "singleInsertSub (>) is failed")
112
+
113
+	// (4) >=
114
+	node.insertSub(">=", 4, true)
115
+	assert.Equal(t, 0, findSub(node.single2sub_eb, 4), "singleInsertSub (>=) is failed")
116
+
117
+	// (5) ==
118
+	node.insertSub("==", 5, true)
119
+	assert.Equal(t, 0, findSub(node.single2sub_e, 5), "singleInsertSub (==) is failed")
120
+
121
+	// 2. range2sub
122
+
123
+	// (1) <
124
+	node.insertSub("<", 1, false)
125
+	assert.Equal(t, 0, findSub(node.range2sub_s, 1), "rangeInsertSub (<) is failed")
126
+
127
+	// (2) <=
128
+	node.insertSub("<=", 2, false)
129
+	assert.Equal(t, 0, findSub(node.range2sub_es, 2), "rangeInsertSub (<=) is failed")
130
+
131
+	// (3) >
132
+	node.insertSub(">", 3, false)
133
+	assert.Equal(t, 0, findSub(node.range2sub_b, 3), "rangeInsertSub (>) is failed")
134
+
135
+	// (4) >=
136
+	node.insertSub(">=", 4, false)
137
+	assert.Equal(t, 0, findSub(node.range2sub_eb, 4), "rangeInsertSub (>=) is failed")
138
+
139
+}
140
+
141
+func Test_getTopicPos(t *testing.T) {
142
+	//l := topicList{}
143
+	//l.addTopicNode(nil)
144
+	//
145
+	//// 1. find Topic in empty topiclist
146
+	//if l.head.topic != nil {
147
+	//	fmt.Println("addTopicNode is failed")
148
+	//}
149
+	//
150
+	//// 2. find {{12},{34},{56},{78},{89},{90}} in topiclist
151
+	//ary := [][]int64{{12}, {34}, {56}, {78}, {89}, {90}}
152
+	//if l.getTopicNodePos(ary[0]) != nil {
153
+	//	fmt.Println("getTopicNodePos is failed")
154
+	//}
155
+	//
156
+	//for i := 0; i < 5; i++ {
157
+	//	l.addTopicNode(ary[i])
158
+	//	assert.Equal(t, ary[i], l.tail.topic)
159
+	//}
160
+
161
+}
162
+
163
+func Test_getValuePos(t *testing.T) {
164
+	l := valueList{}
165
+	l.addValueNode(nil)
166
+
167
+	ary := [][]int64{{12}, {34}, {56}, {78}, {89}, {90}}
168
+
169
+	// 1. find {{12},{34},{56},{78},{89},{90}} in topiclist
170
+	for i := 0; i < 5; i++ {
171
+		l.addValueNode(ary[i])
172
+		assert.Equal(t, ary[i], l.tail.val)
173
+	}
174
+}

+ 40
- 0
src/broker/modules/logConfig.go Bestand weergeven

@@ -0,0 +1,40 @@
1
+package modules
2
+
3
+import (
4
+	"go.uber.org/zap"
5
+	"go.uber.org/zap/zapcore"
6
+)
7
+
8
+func MyEncoderConfig() zapcore.EncoderConfig {
9
+	return zapcore.EncoderConfig{
10
+		// Keys can be anything except the empty string.
11
+		TimeKey:  "T",
12
+		LevelKey: "L",
13
+		NameKey:  "N",
14
+		//CallerKey:      "C",
15
+		FunctionKey: zapcore.OmitKey,
16
+		MessageKey:  "M",
17
+		//StacktraceKey:  "S",
18
+		LineEnding:     zapcore.DefaultLineEnding,
19
+		EncodeLevel:    zapcore.CapitalColorLevelEncoder,
20
+		EncodeTime:     zapcore.ISO8601TimeEncoder,
21
+		EncodeDuration: zapcore.StringDurationEncoder,
22
+		EncodeCaller:   zapcore.ShortCallerEncoder,
23
+	}
24
+}
25
+
26
+func NewMyLogger() *zap.Logger {
27
+	cfg := zap.Config{
28
+		Level:            zap.NewAtomicLevelAt(zap.DebugLevel),
29
+		Development:      true,
30
+		Encoding:         "console",
31
+		EncoderConfig:    MyEncoderConfig(),
32
+		OutputPaths:      []string{"stdout", "./test.log"},
33
+		ErrorOutputPaths: []string{"stderr"},
34
+	}
35
+	logger, err := cfg.Build()
36
+	if err != nil {
37
+		panic(err)
38
+	}
39
+	return logger
40
+}

+ 87
- 0
src/broker/modules/manage.go Bestand weergeven

@@ -0,0 +1,87 @@
1
+package modules
2
+
3
+import "sync"
4
+
5
+//각 Microservice에 대한 정보 저장 노드
6
+type MSNode struct {
7
+	nodeName string //Nodename- 현재 데모에서는 IpAddress와 같음
8
+	ipAddr   string
9
+}
10
+
11
+//Nodename 반환
12
+func (node *MSNode) GetName() string {
13
+	return node.nodeName
14
+}
15
+
16
+//IpAddress 반환
17
+func (node *MSNode) GetIpaddr() string {
18
+	return node.ipAddr
19
+}
20
+
21
+type NodeManager interface {
22
+	GetIpaddr(nodeName string) (string, bool) //IpAddress반환
23
+	AddMicroservice(node MSNode) bool         //MS추가
24
+	RemoveMicroservice(nodeName string) bool  //MS삭제
25
+}
26
+
27
+//모든 Microservice정보 저장
28
+type MStable struct {
29
+	NodeTable map[string]MSNode
30
+	mu        sync.RWMutex
31
+}
32
+
33
+//MStable 생성자
34
+func NewMStable() *MStable {
35
+	logger := NewMyLogger()
36
+	defer logger.Sync()
37
+
38
+	defer logger.Debug("node manager setting complete.")
39
+	return &MStable{NodeTable: make(map[string]MSNode)}
40
+}
41
+
42
+//IpAddress반환
43
+func (manager *MStable) GetIpaddr(nodeName string) (string, bool) {
44
+	//해당 이름의 노드이름이 존재하는지 확인
45
+	node, exists := manager.NodeTable[nodeName]
46
+
47
+	//존재하지 않는 경우 nil리턴
48
+	if !exists {
49
+		return "", false
50
+	} else {
51
+		return node.ipAddr, true
52
+	}
53
+}
54
+
55
+//MS추가
56
+func (manager *MStable) AddMicroservice(node MSNode) bool {
57
+	//삽입 전 존재여부 확인
58
+	manager.mu.RLock()
59
+	_, exists := manager.NodeTable[node.GetName()] ////해당 Node의 이름이 있는지 검색
60
+	//manager.mu.Unlock()
61
+	manager.mu.RUnlock()
62
+
63
+	if exists {
64
+		return false
65
+	} else { //존재안한다면 추가 (존재할경우 이미 있는것이기 때문에 추가할 필요 없음)
66
+		//manager.mu.Lock()
67
+		manager.NodeTable[node.GetName()] = node
68
+		//manager.mu.Unlock()
69
+		return true
70
+	}
71
+}
72
+
73
+//MS삭제
74
+func (manager *MStable) RemoveMicroservice(nodeName string) bool {
75
+	logger := NewMyLogger()
76
+	defer logger.Sync()
77
+	//삭제 전 존재여부 확인
78
+	_, exists := manager.NodeTable[nodeName] //해당 이름을 가진 Node가 있는지 검색
79
+
80
+	if exists {
81
+		delete(manager.NodeTable, nodeName) //존재한다면 삭제
82
+		logger.Info("[" + nodeName + "] : service quit")
83
+		return true
84
+	} else {
85
+		return false
86
+	}
87
+}

+ 170
- 0
src/broker/modules/matching.go Bestand weergeven

@@ -0,0 +1,170 @@
1
+package modules
2
+
3
+import (
4
+	_ "errors"
5
+)
6
+
7
+type match_manager struct {
8
+	match_count int // Number of successful matches
9
+}
10
+
11
+// Matching operation method
12
+// 1. Traverse TopicList for match (msg.topic == topicNode.topic)
13
+// 2. Traverse TopicNode.ValueList for match (msg.value <operator> valueNode.value)
14
+// 3. Add ipaddr to ipList when successful matching
15
+// 4. Add ipaddr to ipList when range matching
16
+// 5. Delete Duplicated Ipaddrs
17
+// @Return (list of IP addresses of matched subs, Pub Msg, error)
18
+func (moscato *Moscato) Matching(msg MsgUnit) {
19
+
20
+	//topic := msg.(PublishMsg).Topic
21
+	//value := msg.(PublishMsg).Value
22
+	//sub_mng := moscato.SubscriptionManager
23
+	//
24
+	//// iplist for return
25
+	//ipList := make([]string, 0)
26
+	//
27
+	//// list for matched range subscriptions
28
+	//big := make([]int, 0)
29
+	//small := make([]int, 0)
30
+	//
31
+	//// Find (topicNode[Topic] == msg.Topic) Node
32
+	//topicPtr := sub_mng.list
33
+	//pos := topicPtr.getTopicNodePos(topic)
34
+	//
35
+	//// Don't Exist topicNode
36
+	//if pos == nil {
37
+	//	moscato.SendQueue <- myType{nil, msg, errors.New("Don't Exist Matching Topic")}
38
+	//} else {
39
+	//	// Traverse all valueNode -> and Match
40
+	//	valPtr := pos.list.head
41
+	//	for valPtr != nil {
42
+	//		compare := Compare(valPtr.val, value)
43
+	//		if compare > 0 { // sub.val > pub.val
44
+	//			// single : { >, >= }
45
+	//
46
+	//			// (1) case : >
47
+	//			for i := 0; i < len(valPtr.single2sub_b); i++ {
48
+	//				sub := valPtr.single2sub_b[i]
49
+	//				ip := sub_mng.sub2ip[sub]
50
+	//				ipList = append(ipList, ip)
51
+	//			}
52
+	//
53
+	//			// (2) case : >=
54
+	//			for i := 0; i < len(valPtr.single2sub_eb); i++ {
55
+	//				sub := valPtr.single2sub_eb[i]
56
+	//				ip := sub_mng.sub2ip[sub]
57
+	//				ipList = append(ipList, ip)
58
+	//			}
59
+	//			// range : { >, >= }
60
+	//
61
+	//			// (1) case : >
62
+	//			for i := 0; i < len(valPtr.range2sub_b); i++ {
63
+	//				sub := valPtr.range2sub_b[i]
64
+	//				big = append(big, sub)
65
+	//			}
66
+	//
67
+	//			// (2) case : >=
68
+	//			for i := 0; i < len(valPtr.range2sub_eb); i++ {
69
+	//				sub := valPtr.range2sub_eb[i]
70
+	//				big = append(big, sub)
71
+	//			}
72
+	//
73
+	//		} else if compare < 0 { // sub.val < pub.val
74
+	//
75
+	//			// single : { <, <= }
76
+	//
77
+	//			// (1) case : <
78
+	//			for i := 0; i < len(valPtr.single2sub_s); i++ {
79
+	//				sub := valPtr.single2sub_s[i]
80
+	//				ip := sub_mng.sub2ip[sub]
81
+	//				ipList = append(ipList, ip)
82
+	//			}
83
+	//
84
+	//			// (2) case : <=
85
+	//			for i := 0; i < len(valPtr.single2sub_es); i++ {
86
+	//				sub := valPtr.single2sub_es[i]
87
+	//				ip := sub_mng.sub2ip[sub]
88
+	//				ipList = append(ipList, ip)
89
+	//			}
90
+	//
91
+	//			// range : { <, <= }
92
+	//
93
+	//			// (1) case : <
94
+	//			for i := 0; i < len(valPtr.range2sub_s); i++ {
95
+	//				sub := valPtr.range2sub_s[i]
96
+	//				small = append(small, sub)
97
+	//			}
98
+	//			// (2) case : <=
99
+	//			for i := 0; i < len(valPtr.range2sub_es); i++ {
100
+	//				sub := valPtr.range2sub_es[i]
101
+	//				small = append(small, sub)
102
+	//			}
103
+	//
104
+	//		} else { // sub.val == pub.val
105
+	//
106
+	//			// single : { <=, >=, ==}
107
+	//
108
+	//			// (1) case : <=
109
+	//			for i := 0; i < len(valPtr.single2sub_es); i++ {
110
+	//				sub := valPtr.single2sub_es[i]
111
+	//				ip := sub_mng.sub2ip[sub]
112
+	//				ipList = append(ipList, ip)
113
+	//			}
114
+	//
115
+	//			// (2) case : >=
116
+	//			for i := 0; i < len(valPtr.single2sub_eb); i++ {
117
+	//				sub := valPtr.single2sub_eb[i]
118
+	//				ip := sub_mng.sub2ip[sub]
119
+	//				ipList = append(ipList, ip)
120
+	//			}
121
+	//
122
+	//			// (3) case : ==
123
+	//			for i := 0; i < len(valPtr.single2sub_e); i++ {
124
+	//				sub := valPtr.single2sub_e[i]
125
+	//				ip := sub_mng.sub2ip[sub]
126
+	//				ipList = append(ipList, ip)
127
+	//			}
128
+	//
129
+	//			// range : { <=, >= }
130
+	//
131
+	//			// (1) case : <=
132
+	//			for i := 0; i < len(valPtr.range2sub_es); i++ {
133
+	//				sub := valPtr.range2sub_es[i]
134
+	//				small = append(small, sub)
135
+	//			}
136
+	//
137
+	//			// (2) case : >=
138
+	//			for i := 0; i < len(valPtr.range2sub_eb); i++ {
139
+	//				sub := valPtr.range2sub_eb[i]
140
+	//				big = append(big, sub)
141
+	//			}
142
+	//
143
+	//		}
144
+	//		valPtr = valPtr.next
145
+	//	}
146
+	//
147
+	//	// Add the intersection IP address of two sets (large and small) to the return list
148
+	//	intersectSubnumbers := intersect.Hash(small, big)
149
+	//	list := reflect.ValueOf(intersectSubnumbers)
150
+	//	for i := 0; i < list.Len(); i++ {
151
+	//		sub := list.Index(i).Interface().(int)
152
+	//		ip := sub_mng.sub2ip[sub]
153
+	//		ipList = append(ipList, ip)
154
+	//	}
155
+	//	moscato.MatchingManager.match_count++
156
+	//
157
+	//	// To delete Duplicated Ipaddr
158
+	//	keys := make(map[string]bool)
159
+	//	retIpList := []string{}
160
+	//	for _, val := range ipList {
161
+	//		if _, saveVal := keys[val]; !saveVal{
162
+	//			keys[val] = true
163
+	//			retIpList = append(retIpList, val)
164
+	//		}
165
+	//	}
166
+	//
167
+	//	// Return {IpaddressList, PubMsg, ErrorMsg} to SendQueue
168
+	//	moscato.SendQueue <- myType{retIpList, msg, nil}
169
+	//}
170
+}

+ 170
- 0
src/broker/modules/matching_test.go Bestand weergeven

@@ -0,0 +1,170 @@
1
+package modules
2
+
3
+//
4
+//import (
5
+//	"fmt"
6
+//	"github.com/stretchr/testify/assert"
7
+//	"math/rand"
8
+//	"strconv"
9
+//	"testing"
10
+//	"time"
11
+//)
12
+//
13
+//
14
+//func makePubData(IsAlpha bool) MsgUnit{
15
+//	rand.Seed(time.Now().UnixNano())
16
+//	// Set Ipaddr
17
+//	msg := Message{"", "1.0", "", PM}
18
+//	for i := 0; i < 4; i++{
19
+//		itoa := strconv.Itoa(rand.Int() % 256)
20
+//		msg.From += itoa
21
+//		if i != 3{
22
+//			msg.From += "."
23
+//		}
24
+//	}
25
+//
26
+//	// Set Time
27
+//	msg.Time += strconv.Itoa(rand.Int()%24) + ":"
28
+//	msg.Time += strconv.Itoa(rand.Int()%60)
29
+//
30
+//	Topic := []int64{}
31
+//	Value := []int64{}
32
+//	content := []int64{}
33
+//
34
+//	// Set Topic
35
+//	topicLen := rand.Int() % 10 + 1
36
+//	for i := 0 ; i < topicLen; i++{
37
+//		Topic = append(Topic, rand.Int63())
38
+//	}
39
+//
40
+//	// Set Value
41
+//	if IsAlpha {
42
+//		valueLen := rand.Int() % 10 + 1
43
+//		for i := 0 ; i < valueLen; i++{
44
+//			Value = append(Value, rand.Int63())
45
+//		}
46
+//	} else {
47
+//		Topic = append(Topic, rand.Int63())
48
+//		Value = append(Value, rand.Int63())
49
+//	}
50
+//
51
+//	// Set content
52
+//	contentLen := rand.Int() % 10
53
+//	for i := 0; i < contentLen; i++{
54
+//		content = append(content, rand.Int63())
55
+//	}
56
+//
57
+//	return &PublishMsg{msg, Topic, Value, content}
58
+//}
59
+//
60
+//func makeSubData(IsAlpha bool, Topic []int64, Value []int64) MsgUnit{
61
+//	rand.Seed(time.Now().UnixNano())
62
+//	// Set Ipaddr
63
+//	msg := Message{"", "1.0", "", SM}
64
+//	for i := 0; i < 4; i++{
65
+//		itoa := strconv.Itoa(rand.Int() % 256)
66
+//		msg.From += itoa
67
+//		if i != 3{
68
+//			msg.From += "."
69
+//		}
70
+//	}
71
+//
72
+//	// Set Time
73
+//	msg.Time += strconv.Itoa(rand.Int()%24) + ":"
74
+//	msg.Time += strconv.Itoa(rand.Int()%60)
75
+//
76
+//	// Set Topic, Value, Operator
77
+//	Operator := []string{}
78
+//	candOp := []string{">", ">=", "<=" ,"<", "=="}
79
+//	logicalOp := []string{"&&", "||"}
80
+//
81
+//	if IsAlpha {
82
+//		Operator = append(Operator, "==")
83
+//	} else {
84
+//		randSeed := rand.Int() % 2
85
+//		if randSeed == 0 {
86
+//			lop := rand.Int() % 2
87
+//			Operator = append(Operator, candOp[rand.Int()%2])
88
+//			if lop == 0 {
89
+//				Operator = append(Operator, logicalOp[0])
90
+//				for {
91
+//					x := rand.Int63()
92
+//					if x > Value[0] {
93
+//						Value = append(Value, x)
94
+//						break
95
+//					}
96
+//				}
97
+//			} else {
98
+//				Operator = append(Operator, logicalOp[1])
99
+//				Value = append(Value, rand.Int63())
100
+//			}
101
+//			Operator = append(Operator, candOp[rand.Int()%2+2])
102
+//		} else {
103
+//			op := candOp[rand.Int()%5]
104
+//			Operator = append(Operator, op)
105
+//		}
106
+//	}
107
+//
108
+//	return &SubscriptionMsg{msg, Topic, Value, Operator, IsAlpha}
109
+//}
110
+//
111
+//func Test_matching(t *testing.T) {
112
+//	rand.Seed(time.Now().UnixNano())
113
+//	assert.Equal(t, 1, 1)
114
+//	mos := Moscato{ sub_mng: *newSubmng(),}
115
+//	mos.queue.queue_init()
116
+//
117
+//	// 1. Make Publish Data
118
+//	pubLen := rand.Int()%100 + 1 // Set dataLength
119
+//	var pubDataList []MsgUnit
120
+//
121
+//	for i := 0; i < pubLen; i++{
122
+//		if i % 2 == 1 {
123
+//			msg := makePubData(true)
124
+//			mos.queue.push(msg)
125
+//			pubDataList = append(pubDataList, msg)
126
+//		} else{
127
+//			msg := makePubData(false)
128
+//			mos.queue.push(msg)
129
+//			pubDataList = append(pubDataList, msg)
130
+//		}
131
+//	}
132
+//
133
+//	// 2. Creates subscription data with (the same <Topic, Value> or (the same <Topic, difValue>
134
+//	//    And Add Subscription
135
+//	subLen := pubLen
136
+//	var subDataList []MsgUnit
137
+//	for i := 0; i < subLen; i++{
138
+//		Topic := pubDataList[i].(*PublishMsg).Topic
139
+//		Value := pubDataList[i].(*PublishMsg).Value
140
+//		if i % 2 == 1{
141
+//			msg := makeSubData(true, Topic, Value)
142
+//			mos.sub_mng.addSubscription(msg)
143
+//			subDataList = append(subDataList, msg)
144
+//
145
+//		} else{
146
+//			msg := makeSubData(false, Topic, Value)
147
+//			mos.sub_mng.addSubscription(msg)
148
+//			subDataList = append(subDataList, msg)
149
+//		}
150
+//	}
151
+//
152
+//	// 3. Watch Data
153
+//
154
+//  	// fmt.Println("PubData")
155
+//	// watchData(pubDataList, pubLen, false)
156
+//	// fmt.Println("\nSubData")
157
+//	// watchData(subDataList, subLen, true)
158
+//
159
+//	// 4. Matching
160
+//	for i := 0; i < pubLen; i++ {
161
+//		matching, pubmsg, err := mos.MatchingManager.Matching(&mos.queue, &mos.sub_mng)
162
+//
163
+//		// Check if matching is correct
164
+//		assert.Equal(t, nil, err)
165
+//
166
+//		fmt.Println("matching list = ", matching)
167
+//		fmt.Println("pub msg = ", pubmsg)
168
+//		fmt.Println("err ?= ", err)
169
+//	}
170
+//}

+ 152
- 0
src/broker/modules/message.go Bestand weergeven

@@ -0,0 +1,152 @@
1
+package modules
2
+
3
+import "encoding/json"
4
+
5
+//*****메세지 타입 상수화
6
+const (
7
+	KGM = 1 + iota //KeyGenMessage
8
+	KSM            //KeyShareMessage
9
+	PM             //PublishMessage
10
+	SM             //SubscriptionMessage
11
+	RM             //RegisterMessage
12
+	WM             //WithdrawMessage
13
+)
14
+
15
+//*****메세지 틀*****
16
+type Message struct {
17
+	From    string //메세지 만든 주체의 Ip주소
18
+	Version string //메세지 버전
19
+	Time    string //메세지 만든 시간
20
+	Kind    int    //메세지 종류
21
+}
22
+
23
+type MsgUnit interface {
24
+	ConvertToJson() ([]byte, error) //메세지를 JSON형식으로 바꿔주는 멤버함수
25
+	CheckType() int                 //메세지의 종류를 반환하는 함수
26
+}
27
+
28
+//*****각 메세지 형식 및 정의**********
29
+
30
+//KeyGen 명령 메세지
31
+type PublishedImage struct {
32
+	Message
33
+	Topic []float64
34
+}
35
+
36
+type KeyGenMsg struct {
37
+	Message
38
+	iptable []string
39
+}
40
+
41
+//Key공유 메세지
42
+type KeyShareMsg struct {
43
+	Message
44
+	key string
45
+}
46
+
47
+//전달할 내용을 담은 메세지
48
+type PublishMsg struct {
49
+	Message
50
+	Topic   []int64 //대주제	ex)soccer
51
+	Value   []int64 //topic의 세부적인 내용 ex)ManCity or 40
52
+	Content []int64 //내용 ex)오늘 케빈 데 브라위너가 골을 넣었다
53
+}
54
+
55
+type SubscriptionImage struct {
56
+	Message
57
+	Topic []float64
58
+}
59
+
60
+//구독 정보를 담은 메세지
61
+type SubscriptionMsg struct {
62
+	Message
63
+	Topic    []int64  //대주제 ex)soccer
64
+	Value    []int64  //피연산자 ex)Mancity or 20 40
65
+	Operator []string //연산자	ex) ==, < && >
66
+	IsAlpha  bool     //value가 숫자인지 문자열인지, 문자열이면 단순비교, 숫자이면 범위연산
67
+}
68
+
69
+type RegisterImgMsg struct {
70
+	Message
71
+	PrivateKey float64
72
+}
73
+
74
+//Microservice 등록 메세지
75
+type RegisterMsg struct {
76
+	Message
77
+	PrivateKey int64
78
+}
79
+
80
+//Microservice 탈퇴 메세지(없앰)
81
+type WithdrawMsg struct {
82
+	Message
83
+}
84
+
85
+func (msg SubscriptionImage) ConvertToJson() ([]byte, error) {
86
+	js := msg
87
+	jsonBytes, err := json.Marshal(js)
88
+	return jsonBytes, err
89
+}
90
+
91
+//******ConverToJson을 메세지 종류별로 실행가능하게 구현******
92
+func (msg KeyGenMsg) ConvertToJson() ([]byte, error) {
93
+	js := msg
94
+	jsonBytes, err := json.Marshal(js)
95
+	return jsonBytes, err
96
+}
97
+
98
+func (msg KeyShareMsg) ConvertToJson() ([]byte, error) {
99
+	js := msg
100
+	jsonBytes, err := json.Marshal(js)
101
+	return jsonBytes, err
102
+}
103
+
104
+func (msg PublishedImage) ConvertToJson() ([]byte, error) {
105
+	js := msg
106
+	jsonBytes, err := json.Marshal(js)
107
+	return jsonBytes, err
108
+}
109
+
110
+func (msg PublishMsg) ConvertToJson() ([]byte, error) {
111
+	js := msg
112
+	jsonBytes, err := json.Marshal(js)
113
+	return jsonBytes, err
114
+}
115
+
116
+func (msg SubscriptionMsg) ConvertToJson() ([]byte, error) {
117
+	js := msg
118
+	jsonBytes, err := json.Marshal(js)
119
+	return jsonBytes, err
120
+}
121
+
122
+func (msg RegisterImgMsg) ConvertToJson() ([]byte, error) {
123
+	js := msg
124
+	jsonBytes, err := json.Marshal(js)
125
+	return jsonBytes, err
126
+}
127
+
128
+func (msg RegisterMsg) ConvertToJson() ([]byte, error) {
129
+	js := msg
130
+	jsonBytes, err := json.Marshal(js)
131
+	return jsonBytes, err
132
+}
133
+
134
+func (msg WithdrawMsg) ConvertToJson() ([]byte, error) {
135
+	js := msg
136
+	jsonBytes, err := json.Marshal(js)
137
+	return jsonBytes, err
138
+}
139
+
140
+//CheckType함수 구현
141
+func (msg Message) CheckType() int {
142
+	return msg.Kind //메세지 멤버변수 Kind 리턴
143
+}
144
+
145
+//KeyGenMsg 생성자
146
+func NewKeyGenMsg(table *MStable) *KeyGenMsg {
147
+	m := &KeyGenMsg{}
148
+	for _, value := range table.NodeTable { // MicroService테이블에서 ip주소를 다 가져와서 iptable에 넣음
149
+		m.iptable = append(m.iptable, value.GetIpaddr())
150
+	}
151
+	return m
152
+}

+ 55
- 0
src/broker/modules/queue.go Bestand weergeven

@@ -0,0 +1,55 @@
1
+package modules
2
+
3
+import (
4
+	"errors"
5
+)
6
+
7
+type MsgQueue struct {
8
+	queue     chan MsgUnit
9
+	QueueFunc QueueOperate
10
+}
11
+
12
+type QueueOperate interface {
13
+	queue_init() error              //Queue 초기화 멤버함수
14
+	push(msg Message) bool          //Queue push 멤버함수
15
+	pop(wait bool) (Message, error) //Queue pop 멤버함수 (wait -> busy waiting 여부 결정)
16
+}
17
+
18
+//Message Queue를 초기화 해주는 함수
19
+func (mq *MsgQueue) queue_init() error {
20
+	logger := NewMyLogger()
21
+	logger.Sync()
22
+
23
+	if mq.queue != nil && len(mq.queue) != 0 {
24
+		return errors.New("Queue Hadlerer Error: Already initialized.")
25
+	} else if mq.queue == nil {
26
+		mq.queue = make(chan MsgUnit, 1000)
27
+		//log.Println("queue is initialized.")
28
+		logger.Debug("queue is initialized")
29
+		return nil
30
+	} else {
31
+		close(mq.queue)
32
+		mq.queue = make(chan MsgUnit)
33
+		return nil
34
+	}
35
+}
36
+
37
+//push 함수
38
+func (mq *MsgQueue) push(msg MsgUnit) bool {
39
+	mq.queue <- msg
40
+	return true
41
+}
42
+
43
+//pop 함수
44
+func (mq *MsgQueue) pop(block bool) MsgUnit {
45
+	if block == false {
46
+		if len(mq.queue) == 0 {
47
+			return nil
48
+		} else {
49
+			return <-mq.queue
50
+		}
51
+	} else {
52
+		//queue에 데이터가 들어올 때까지 block
53
+		return <-mq.queue
54
+	}
55
+}

+ 102
- 0
src/broker/modules/secure.go Bestand weergeven

@@ -0,0 +1,102 @@
1
+package modules
2
+
3
+import (
4
+	"fmt"
5
+	"strconv"
6
+)
7
+
8
+// 키관리 부분, 노드 입력받고 키 반환하는 부분 구현
9
+type Security struct {
10
+	KeyMap map[string]string
11
+}
12
+
13
+//Security 생성자
14
+func NewSecurity() *Security {
15
+	logger := NewMyLogger()
16
+	defer logger.Sync()
17
+	security := &Security{map[string]string{}}
18
+	defer logger.Debug("security setting complete.")
19
+	return security
20
+}
21
+
22
+type SecurityManager interface {
23
+	RegKey(rm RegisterMsg) // 원래 RegisterMsg
24
+	GetNodeKey(nodeName string) int64
25
+	ReEncrypt(fromKey int64, toKey int64, target []int64) []int64
26
+	ReEncPubMsg(fromPubMsg PublishMsg, nodeName string) PublishMsg
27
+	RemoveSecureKey(nodeName string) bool
28
+	//CompareTopic(topic1 []int64, topic2 []int64) int
29
+	//CompareDigit(topic1 int64, topic2 int64) int
30
+	//CompareAlpha(topic1 []int64, topic2 []int64) int
31
+}
32
+
33
+/*
34
+keyShareMsg 에서 각 노드의 private 키를 받아 keyMap 에 저장
35
+*/
36
+func (sc Security) RegKey(rm RegisterMsg) {
37
+	sc.KeyMap[rm.Message.From] = strconv.FormatInt(rm.PrivateKey, 10)
38
+}
39
+
40
+/**
41
+각 노드의 키를 주소를 이용하여 맵에서 가져옴
42
+*/
43
+func (sc Security) GetNodeKey(nodeName string) int64 {
44
+
45
+	messageStringKey := sc.KeyMap[nodeName]
46
+	mKey, err := strconv.ParseInt(messageStringKey, 10, 64)
47
+	if err != nil {
48
+		fmt.Println("GetNodeKey Error: key string to int64 parsing error.")
49
+	}
50
+	return mKey
51
+}
52
+
53
+/*
54
+reEncrypt 해서 슬라이스 반환
55
+*/
56
+func (sc Security) ReEncrypt(fromKey int64, toKey int64, target []int64) []int64 {
57
+	var tmpTarget []int64
58
+	for index := range target {
59
+		tmpTarget = append(tmpTarget, target[index]-fromKey+toKey)
60
+	}
61
+
62
+	return tmpTarget
63
+}
64
+
65
+func (sc Security) ReEncryptWithoutPrivateKey(toKey int64, target []int64) []int64 {
66
+	var tmpTarget []int64
67
+	for index := range target {
68
+		tmpTarget = append(tmpTarget, target[index]+toKey)
69
+	}
70
+
71
+	return tmpTarget
72
+}
73
+
74
+// topic과 value는 m+k로만 존재하므로 ReEnc과정에서 subscriber의 개인키만 더해주면 된다.
75
+func (sc Security) ReEncPubMsg(fromPubMsg PublishMsg, nodeName string) PublishMsg {
76
+	toKey := sc.GetNodeKey(nodeName)
77
+	fromKey := sc.GetNodeKey(fromPubMsg.Message.From)
78
+
79
+	toPubMsg := PublishMsg{}
80
+	toPubMsg.Message = fromPubMsg.Message
81
+	toPubMsg.Topic = sc.ReEncryptWithoutPrivateKey(toKey, fromPubMsg.Topic)
82
+	toPubMsg.Value = sc.ReEncryptWithoutPrivateKey(toKey, fromPubMsg.Value)
83
+	toPubMsg.Content = sc.ReEncrypt(fromKey, toKey, fromPubMsg.Content)
84
+
85
+	return toPubMsg
86
+}
87
+
88
+//Key제거 함수
89
+func (sc *Security) RemoveSecureKey(nodeName string) bool {
90
+	logger := NewMyLogger()
91
+	defer logger.Sync()
92
+	//삭제 전 존재여부 확인
93
+	_, exists := sc.KeyMap[nodeName]
94
+
95
+	if exists {
96
+		delete(sc.KeyMap, nodeName)
97
+		logger.Debug("[" + nodeName + "] : delete Key successful")
98
+		return true
99
+	} else {
100
+		return false
101
+	}
102
+}

+ 139
- 0
src/broker/modules/secure_test.go Bestand weergeven

@@ -0,0 +1,139 @@
1
+package modules
2
+
3
+import (
4
+	"fmt"
5
+)
6
+
7
+//func TestCompare(t *testing.T) {
8
+//	var security = NewSecurity()
9
+//	var sm SecurityManager
10
+//	sm = security
11
+//
12
+//	ksm := KeyShareMsg{Message: Message{From: "1.1.1.1", Version: "1", Time: "2", Kind: 1}, key: "1234"}
13
+//	sm.RegKey(ksm)
14
+//	sm.GetNodeKey(ksm.Message.From)
15
+//	fmt.Println(sm.GetNodeKey(ksm.Message.From))
16
+//	var targetKey []int64
17
+//	targetKey = []int64{1234, 1235, 1236}
18
+//	fmt.Println(sm.ReEncrypt(sm.GetNodeKey(ksm.Message.From), 0, targetKey))
19
+//	//fmt.Println(sm.CompareDigit(1236, 1234))
20
+//
21
+//}
22
+
23
+func CreatePubMsg(msg Message, topic string, value string, content string) *PublishMsg {
24
+	toPubMsg := new(PublishMsg)
25
+	toPubMsg.Message = msg
26
+
27
+	intArr := []rune(topic)
28
+	//fmt.Print("Topic length ")
29
+	//fmt.Println(len(intArr))
30
+	//fmt.Println(len(toPubMsg.Topic))
31
+	for index := 0; index < len(intArr); index++ {
32
+		toPubMsg.Topic = append(toPubMsg.Topic, int64(intArr[index]))
33
+	}
34
+	//fmt.Println(len(toPubMsg.Topic))
35
+	intArr = []rune(value)
36
+	for index := 0; index < len(intArr); index++ {
37
+		toPubMsg.Value = append(toPubMsg.Value, int64(intArr[index]))
38
+	}
39
+	intArr = []rune(content)
40
+	for index := 0; index < len(intArr); index++ {
41
+		toPubMsg.Content = append(toPubMsg.Content, int64(intArr[index]))
42
+	}
43
+
44
+	return toPubMsg
45
+}
46
+
47
+func EncryptionMsg(msg *PublishMsg, gyKey int64, privateKey int64) *PublishMsg {
48
+	for index := range msg.Topic {
49
+		msg.Topic[index] = msg.Topic[index] + gyKey
50
+	}
51
+	for index := range msg.Value {
52
+		msg.Value[index] = msg.Value[index] + gyKey
53
+	}
54
+	for index := range msg.Content {
55
+		msg.Content[index] = msg.Content[index] + gyKey + privateKey
56
+	}
57
+
58
+	return msg
59
+}
60
+
61
+func DecryptionMsg(msg *PublishMsg, gyKey int64, privateKey int64) {
62
+	for index := range msg.Topic {
63
+		msg.Topic[index] = msg.Topic[index] - gyKey - privateKey
64
+	}
65
+	for index := range msg.Value {
66
+		msg.Value[index] = msg.Value[index] - gyKey - privateKey
67
+	}
68
+	for index := range msg.Content {
69
+		msg.Content[index] = msg.Content[index] - gyKey - privateKey
70
+	}
71
+
72
+	var runeArr []rune
73
+	for index := range msg.Topic {
74
+		runeArr = append(runeArr, rune(int(msg.Topic[index])))
75
+	}
76
+	fmt.Println("Topic is: " + string(runeArr))
77
+	runeArr = nil
78
+
79
+	for index := range msg.Value {
80
+		runeArr = append(runeArr, rune(int(msg.Value[index])))
81
+	}
82
+	fmt.Println("Value is: " + string(runeArr))
83
+	runeArr = nil
84
+
85
+	for index := range msg.Content {
86
+		runeArr = append(runeArr, rune(int(msg.Content[index])))
87
+	}
88
+	fmt.Println("Content is: " + string(runeArr))
89
+	runeArr = nil
90
+
91
+}
92
+
93
+func printMsg(msg *PublishMsg) {
94
+	var runeArr []rune
95
+	for index := range msg.Topic {
96
+		runeArr = append(runeArr, rune(int(msg.Topic[index])))
97
+	}
98
+	fmt.Println("Topic is: " + string(runeArr))
99
+	runeArr = nil
100
+
101
+	for index := range msg.Value {
102
+		runeArr = append(runeArr, rune(int(msg.Value[index])))
103
+	}
104
+	fmt.Println("Value is: " + string(runeArr))
105
+	runeArr = nil
106
+
107
+	for index := range msg.Content {
108
+		runeArr = append(runeArr, rune(int(msg.Content[index])))
109
+	}
110
+	fmt.Println("Content is: " + string(runeArr) + "\n\n")
111
+	runeArr = nil
112
+}
113
+
114
+// From "1.1.1.1" to "3.3.3.3" node
115
+//func TestReEnc(t *testing.T) {
116
+//	var security = NewSecurity()
117
+//	var sm SecurityManager
118
+//	sm = security
119
+//	security.KeyMap["1.1.1.1"] = "56789"
120
+//	security.KeyMap["3.3.3.3"] = "99999"
121
+//
122
+//	//fmt.Println(sm.GetNodeKey("3.3.3.3"))
123
+//	msg := Message{From: "1.1.1.1", Version: "1", Time: "123", Kind: 3}
124
+//	//fmt.Println(msg)
125
+//	publishMsg := CreatePubMsg(msg, "soccer123한글", "playerList", "Son and 10 players")
126
+//	fmt.Println(publishMsg)
127
+//	fmt.Println("original publish message is...")
128
+//	printMsg(publishMsg)
129
+//	encPublishMsg := EncryptionMsg(publishMsg, 1234, 56789)
130
+//	fmt.Println("encrypt publish message by publisher's private key")
131
+//	printMsg(encPublishMsg)
132
+//	fmt.Println(encPublishMsg)
133
+//	reEncPublishMsg := sm.ReEncPubMsg(*encPublishMsg, "3.3.3.3")
134
+//	fmt.Println("re-encrypt publish message by subscriber's private key")
135
+//	printMsg(reEncPublishMsg)
136
+//	//fmt.Println(reEncPublishMsg)
137
+//	fmt.Println("decrypted publish message is...")
138
+//	DecryptionMsg(reEncPublishMsg, 1234, 99999)
139
+//}

+ 301
- 0
src/broker/modules/subscription.go Bestand weergeven

@@ -0,0 +1,301 @@
1
+package modules
2
+
3
+import (
4
+	"errors"
5
+	"fmt"
6
+	"strconv"
7
+)
8
+
9
+// Structure for managing subscriptions
10
+type sub_manager struct {
11
+	list       topicList          // Data Structure for Manage TopicNode
12
+	count_sub  int                // Subscription#
13
+	emptylist  []int              // For administrate Subscription#(Deleted)
14
+	ip2sub     map[string][]int   // For mapping {ip : Sub#s List}
15
+	sub2ip     map[int]string     // For mapping {Sub# : ip}
16
+	sub2node   map[int][]nodeInfo // For mapping {Sub# : NodeInfo List}
17
+	israngesub map[int]bool       // To manage when deleted
18
+
19
+	subnum2ip   map[int]string
20
+	subnum2conv map[int][]float64
21
+}
22
+
23
+type nodeInfo struct {
24
+	valNodeList []*valueNode
25
+	topic       []int64
26
+}
27
+
28
+type convNode struct {
29
+	subNum int
30
+}
31
+
32
+func (manager *sub_manager) Initialize() {
33
+	// Some initialize
34
+	manager.ip2sub = make(map[string][]int)
35
+	manager.sub2ip = make(map[int]string)
36
+	manager.sub2node = make(map[int][]nodeInfo)
37
+	manager.israngesub = make(map[int]bool)
38
+
39
+	manager.subnum2ip = make(map[int]string)
40
+	manager.subnum2conv = make(map[int][]float64)
41
+}
42
+
43
+func newSubmng() *sub_manager {
44
+	subMng := &sub_manager{}
45
+	subMng.Initialize()
46
+	return subMng
47
+}
48
+
49
+func (manager *sub_manager) isDuplicated(msg MsgUnit) bool {
50
+	//from := msg.(SubscriptionImage).From
51
+	//topic := msg.(SubscriptionImage).Topic
52
+	//value := msg.(SubscriptionMsg).Value
53
+	//operator := msg.(SubscriptionMsg).Operator
54
+	//subList := manager.ip2sub[from]
55
+	canFind := false
56
+
57
+	//for i := 0; i < len(subList) && canFind == false; i++ {
58
+	//	sub := subList[i]
59
+	//	nodeinfoList := manager.sub2node[sub]
60
+	//
61
+	//	for j := 0; j < len(nodeinfoList); j++ {
62
+	//		node := nodeinfoList[j]
63
+
64
+	//if Compare(node.topic, topic) == 0 {
65
+	//	cnt := 0
66
+	//	if len(operator) == 1 {
67
+	//		valPtr := node.valNodeList[0]
68
+	//		if Compare(valPtr.val, value) == 0 {
69
+	//			op := operator[0]
70
+	//			cnt += valPtr.findOperatorList(op, sub, true)
71
+	//		}
72
+	//	} else {
73
+	//		leftop := operator[0]
74
+	//		logicalop := operator[1]
75
+	//		rightop := operator[2]
76
+	//
77
+	//		leftValuePtr := node.valNodeList[0]
78
+	//		rightValuePtr := node.valNodeList[1]
79
+	//
80
+	//		nodeValList := []int64{leftValuePtr.val[0], rightValuePtr.val[0]}
81
+	//
82
+	//		if Compare(nodeValList, value) == 0 {
83
+	//			if logicalop == "&&" {
84
+	//				cnt += leftValuePtr.findOperatorList(leftop, sub, false)
85
+	//				cnt += rightValuePtr.findOperatorList(rightop, sub, false)
86
+	//			} else {
87
+	//				cnt += leftValuePtr.findOperatorList(leftop, sub, true)
88
+	//				cnt += rightValuePtr.findOperatorList(rightop, sub, true)
89
+	//			}
90
+	//		}
91
+	//	}
92
+	//	if cnt == len(node.valNodeList) {
93
+	//		canFind = true
94
+	//		break
95
+	//	}
96
+	//}
97
+	//	}
98
+	//}
99
+	return canFind
100
+}
101
+
102
+// To Insert sub#
103
+func (manager *sub_manager) addSubscription(msg MsgUnit) error {
104
+	topic := msg.(SubscriptionImage).Topic
105
+	//value := msg.(SubscriptionMsg).Value
106
+	//operator := msg.(SubscriptionMsg).Operator
107
+	subnumber := 0
108
+
109
+	// 0. Check if same IP & same <Topic, Value> exists
110
+	if manager.isDuplicated(msg) == true {
111
+		return errors.New("Duplicater Subscription")
112
+	}
113
+
114
+	// 1. Mapping incoming IP address to sub #
115
+	if len(manager.emptylist) == 0 {
116
+		subnumber = manager.count_sub
117
+		manager.ip2sub[msg.(SubscriptionImage).From] = append(manager.ip2sub[msg.(SubscriptionImage).From], subnumber)
118
+		manager.sub2ip[subnumber] = msg.(SubscriptionImage).From
119
+
120
+		manager.subnum2conv[subnumber] = msg.(SubscriptionImage).Topic //conv용
121
+
122
+		manager.count_sub++
123
+
124
+	} else {
125
+		subnumber := manager.emptylist[len(manager.emptylist)-1]
126
+		manager.emptylist = manager.emptylist[:len(manager.emptylist)-1]
127
+		manager.ip2sub[msg.(SubscriptionImage).From] = append(manager.ip2sub[msg.(SubscriptionImage).From], subnumber)
128
+		manager.sub2ip[subnumber] = msg.(SubscriptionImage).From
129
+
130
+		manager.subnum2conv[subnumber] = msg.(SubscriptionImage).Topic //conv용
131
+	}
132
+
133
+	topicptr := manager.list.head
134
+	existTopic := false
135
+
136
+	// 2. Add Subscription
137
+
138
+	// Find topic in topiclist, add if not found
139
+	for topicptr != nil {
140
+		if ConvCompare(topicptr.topic, topic) == 0 {
141
+			existTopic = true
142
+			break
143
+		}
144
+		topicptr = topicptr.next
145
+	}
146
+
147
+	if !existTopic {
148
+		manager.list.addConvTopic(topic)
149
+		topicptr = manager.list.tail
150
+	}
151
+	fmt.Println("SAVED SUB SM ")
152
+	fmt.Println(FloatSlice2String(topic))
153
+
154
+	// 안씀 오류시 날려
155
+	//var addValNodeList []*valueNode
156
+
157
+	// if single expression
158
+	//if len(operator) == 1 {
159
+	//	valptr := topicptr.list.getValueNodePos(value)
160
+	//	op := operator[0]
161
+	//
162
+	//	if valptr == nil {
163
+	//		topicptr.list.addValueNode(value)
164
+	//		valptr = topicptr.list.tail
165
+	//	}
166
+	//
167
+	//	addValNodeList = append(addValNodeList, valptr)
168
+	//	manager.sub2node[subnumber] = append(manager.sub2node[subnumber], nodeInfo{addValNodeList, topic})
169
+	//	manager.israngesub[subnumber] = false
170
+	//	valptr.insertSub(op, subnumber, true)
171
+	//
172
+	//	return nil // AddSubscription ok
173
+	//} else {
174
+	//// if Multi expression
175
+	//// (ex) : { (234 < x) && (x <= 1293) } , { (234 < x) || (x < 1293) }
176
+	//
177
+	//leftOperator := operator[0]
178
+	//logicalOperator := operator[1] // For compound expressions bounded by '&&' and '||'
179
+	//rightOperator := operator[2]
180
+	//
181
+	//// Find ValueNode = (topiclist[topic].list.val == value)
182
+	//valptr1 := topicptr.list.getValueNodePos([]int64{value[0]})
183
+	//valptr2 := topicptr.list.getValueNodePos([]int64{value[1]})
184
+	//
185
+	//if valptr1 == nil {
186
+	//	topicptr.list.addValueNode([]int64{value[0]})
187
+	//	valptr1 = topicptr.list.tail
188
+	//}
189
+	//if valptr2 == nil {
190
+	//	topicptr.list.addValueNode([]int64{value[1]})
191
+	//	valptr2 = topicptr.list.tail
192
+	//}
193
+	//
194
+	//addValNodeList = append(addValNodeList, valptr1)
195
+	//addValNodeList = append(addValNodeList, valptr2)
196
+	//manager.sub2node[subnumber] = append(manager.sub2node[subnumber], nodeInfo{addValNodeList, topic})
197
+	//
198
+	//if logicalOperator == "&&" {
199
+	//	// If they are enclosed in '&&' -> Insert Value to range_operator_list
200
+	//	manager.israngesub[subnumber] = true
201
+	//	valptr1.insertSub(leftOperator, subnumber, false)
202
+	//	valptr2.insertSub(rightOperator, subnumber, false)
203
+	//
204
+	//} else {
205
+	//	// if they are enclosed in '||' -> Insert Value to single_operator_list
206
+	//	manager.israngesub[subnumber] = false
207
+	//	valptr1.insertSub(leftOperator, subnumber, true)
208
+	//	valptr2.insertSub(rightOperator, subnumber, true)
209
+	//}
210
+	//	return nil // addSubscription ok
211
+	//}
212
+	return nil
213
+	//return errors.New("Can't addSubscription")
214
+}
215
+
216
+// To delete subscriptions
217
+func (manager *sub_manager) delete(from string) error {
218
+	ip := from
219
+	cand := manager.ip2sub[ip]
220
+
221
+	for i := 0; i < len(cand); i++ {
222
+		sub := cand[i]
223
+
224
+		for j := 0; j < len(manager.sub2node[sub]); j++ {
225
+			nodeinfo := manager.sub2node[sub][j]
226
+			node := nodeinfo.valNodeList
227
+
228
+			if manager.israngesub[sub] {
229
+				for k := 0; k < len(node); k++ {
230
+					pos := findSub(node[k].range2sub_s, sub)
231
+					if pos != -1 {
232
+						node[k].range2sub_s = remove(node[k].range2sub_s, pos)
233
+						manager.emptylist = append(manager.emptylist, sub)
234
+					}
235
+
236
+					pos = findSub(node[k].range2sub_es, sub)
237
+					if pos != -1 {
238
+						node[k].range2sub_es = remove(node[k].range2sub_es, pos)
239
+						manager.emptylist = append(manager.emptylist, sub)
240
+					}
241
+
242
+					pos = findSub(node[k].range2sub_b, sub)
243
+					if pos != -1 {
244
+						node[k].range2sub_b = remove(node[k].range2sub_b, pos)
245
+						manager.emptylist = append(manager.emptylist, sub)
246
+					}
247
+
248
+					pos = findSub(node[k].range2sub_eb, sub)
249
+					if pos != -1 {
250
+						node[k].range2sub_eb = remove(node[k].range2sub_eb, pos)
251
+						manager.emptylist = append(manager.emptylist, sub)
252
+					}
253
+				}
254
+			} else {
255
+				for k := 0; k < len(node); k++ {
256
+					pos := findSub(node[k].single2sub_s, sub)
257
+					if pos != -1 {
258
+						node[k].single2sub_s = remove(node[k].single2sub_s, pos)
259
+						manager.emptylist = append(manager.emptylist, sub)
260
+					}
261
+
262
+					pos = findSub(node[k].single2sub_es, sub)
263
+					if pos != -1 {
264
+						node[k].single2sub_es = remove(node[k].single2sub_es, pos)
265
+						manager.emptylist = append(manager.emptylist, sub)
266
+					}
267
+
268
+					pos = findSub(node[k].single2sub_b, sub)
269
+					if pos != -1 {
270
+						node[k].single2sub_b = remove(node[k].single2sub_b, pos)
271
+						manager.emptylist = append(manager.emptylist, sub)
272
+					}
273
+
274
+					pos = findSub(node[k].single2sub_eb, sub)
275
+					if pos != -1 {
276
+						node[k].single2sub_eb = remove(node[k].single2sub_eb, pos)
277
+						manager.emptylist = append(manager.emptylist, sub)
278
+					}
279
+
280
+					pos = findSub(node[k].single2sub_e, sub)
281
+					if pos != -1 {
282
+						node[k].single2sub_e = remove(node[k].single2sub_e, pos)
283
+						manager.emptylist = append(manager.emptylist, sub)
284
+					}
285
+				}
286
+			}
287
+		}
288
+		manager.ip2sub[ip] = nil // Delete sub#s mapped to Ip address
289
+		return nil
290
+	}
291
+	return errors.New("Don't Exist Subscription to delete")
292
+}
293
+
294
+func FloatSlice2String(target []float64) string {
295
+	var targetString string
296
+	targetString = ""
297
+	for _, value := range target {
298
+		targetString += strconv.FormatFloat(float64(value), 'f', -1, 64) + " "
299
+	}
300
+	return targetString
301
+}

+ 289
- 0
src/broker/modules/subscription_test.go Bestand weergeven

@@ -0,0 +1,289 @@
1
+package modules
2
+
3
+//
4
+//import (
5
+//	"fmt"
6
+//	_ "fmt"
7
+//	"github.com/stretchr/testify/assert"
8
+//	"math/rand"
9
+//	"strconv"
10
+//	"testing"
11
+//	"time"
12
+//)
13
+//
14
+//func makeData(IsAlpha bool) MsgUnit{
15
+//	rand.Seed(time.Now().UnixNano())
16
+//	// Set Ipaddr
17
+//	msg := Message{"", "1.0", "", SM}
18
+//	for i := 0; i < 4; i++{
19
+//		itoa := strconv.Itoa(rand.Int() % 256)
20
+//		msg.from += itoa
21
+//		if i != 3{
22
+//			msg.from += "."
23
+//		}
24
+//	}
25
+//
26
+//	// Set Time
27
+//	msg.time += strconv.Itoa(rand.Int()%24) + ":"
28
+//	msg.time += strconv.Itoa(rand.Int()%60)
29
+//
30
+//	// Set Topic, Value, Operator
31
+//	Topic := []int64{}
32
+//	Value := []int64{}
33
+//	Operator := []string{}
34
+//	candOp := []string{">", ">=", "<=" ,"<", "=="}
35
+//	logicalOp := []string{"&&", "||"}
36
+//
37
+//	if IsAlpha {
38
+//		topicLen := rand.Int() % 10 + 1
39
+//		for i := 0 ; i < topicLen; i++{
40
+//			Topic = append(Topic, rand.Int63())
41
+//		}
42
+//
43
+//		valueLen := rand.Int() % 10 + 1
44
+//		for i := 0 ; i < valueLen; i++{
45
+//			Value = append(Value, rand.Int63())
46
+//		}
47
+//
48
+//		Operator = append(Operator, "==")
49
+//
50
+//	} else{
51
+//		valueLen := rand.Int() % 2 + 1
52
+//
53
+//		Topic = append(Topic, rand.Int63())
54
+//		Value = append(Value, rand.Int63())
55
+//
56
+//		if valueLen == 1{
57
+//			Operator = append(Operator, candOp[rand.Int()%5])
58
+//		} else{
59
+//			Value = append(Value, rand.Int63())
60
+//			op := rand.Int()%2
61
+//			Operator = append(Operator, candOp[rand.Int()%2])
62
+//			if op == 0 {
63
+//				Operator = append(Operator, logicalOp[0])
64
+//			} else{
65
+//				Operator = append(Operator, logicalOp[1])
66
+//			}
67
+//			Operator = append(Operator, candOp[rand.Int()%2 + 2])
68
+//		}
69
+//	}
70
+//
71
+//	return &SubscriptionMsg{msg, Topic, Value, Operator, IsAlpha}
72
+//}
73
+//
74
+//func makeMsgList(dataLen int, IsAlpha bool) []MsgUnit{
75
+//	rand.Seed(time.Now().UnixNano())
76
+//	var ret []MsgUnit
77
+//	for i := 0 ; i < dataLen ; i++{
78
+//		ret = append(ret, makeData(IsAlpha))
79
+//	}
80
+//	return ret
81
+//}
82
+//
83
+//func checkOperatorList(isSingle bool, sub int, Operator string, l *valueNode) int{
84
+//	ret := -1
85
+//	if isSingle{
86
+//		switch Operator {
87
+//		case "<":
88
+//			ret = findSub(l.single2sub_s, sub)
89
+//		case "<=":
90
+//			ret = findSub(l.single2sub_es, sub)
91
+//		case ">":
92
+//			ret = findSub(l.single2sub_b, sub)
93
+//		case ">=":
94
+//			ret = findSub(l.single2sub_eb, sub)
95
+//		case "==":
96
+//			ret = findSub(l.single2sub_e, sub)
97
+//		}
98
+//
99
+//	} else{
100
+//		switch Operator {
101
+//		case "<":
102
+//			ret = findSub(l.range2sub_s, sub)
103
+//		case "<=":
104
+//			ret = findSub(l.range2sub_es, sub)
105
+//		case ">":
106
+//			ret = findSub(l.range2sub_b, sub)
107
+//		case ">=":
108
+//			ret = findSub(l.range2sub_eb, sub)
109
+//		}
110
+//	}
111
+//	return ret
112
+//}
113
+//
114
+//func watchData(msgList []MsgUnit, dataLen int, isSubscription bool){
115
+//	for i := 0; i < dataLen; i++{
116
+//		msg := msgList[i]
117
+//		if isSubscription {
118
+//			fmt.Println(
119
+//				"\nfrom = ", msg.(*SubscriptionMsg).Message.from,
120
+//				"\ntime = ", msg.(*SubscriptionMsg).Message.time,
121
+//				"\nTopic = ", msg.(*SubscriptionMsg).Topic,
122
+//				"\nValue = ", msg.(*SubscriptionMsg).Value,
123
+//				"\nOperator = ", msg.(*SubscriptionMsg).Operator,
124
+//				"\nIsAlpha ?= ", msg.(*SubscriptionMsg).IsAlpha,
125
+//			)
126
+//		} else{
127
+//			fmt.Println(
128
+//				"\nfrom = ", msg.(*PublishMsg).Message.from,
129
+//				"\ntime = ", msg.(*PublishMsg).Message.time,
130
+//				"\nTopic = ", msg.(*PublishMsg).Topic,
131
+//				"\nValue = ", msg.(*PublishMsg).Value,
132
+//			)
133
+//		}
134
+//	}
135
+//}
136
+//
137
+////Test addSubScription(1) (dif all [Topic, Value, Operator])
138
+//func Test_addSubscription_allDif(t *testing.T) {
139
+//	rand.Seed(time.Now().UnixNano())
140
+//
141
+//	// To Init sub_mng
142
+//	mos := Moscato{sub_mng: *newSubmng()}
143
+//
144
+//	// Make Data set(Subscription)
145
+//	var msgList []MsgUnit
146
+//	dataLen := 100
147
+//	msgList = makeMsgList(dataLen, false)
148
+//
149
+//	//Watch Data set
150
+//	//watchData(msgList, dataLen, true)
151
+//
152
+//	for i := 0; i < dataLen; i++ {
153
+//		msg := msgList[i]
154
+//		ip := msg.(*SubscriptionMsg).Message.from
155
+//		Topic := msg.(*SubscriptionMsg).Topic
156
+//		Value := msg.(*SubscriptionMsg).Value
157
+//		Operator := msg.(*SubscriptionMsg).Operator
158
+//		subnumber := mos.sub_mng.count_sub
159
+//		isSingle := true
160
+//
161
+//		// 0. Check addSubscription
162
+//		err := mos.sub_mng.addSubscription(msg)
163
+//		assert.Equal(t, nil, err)
164
+//
165
+//		// 1. Check if ip mapping is correct
166
+//		assert.Equal(t, subnumber, mos.sub_mng.ip2sub[ip][len(mos.sub_mng.ip2sub[ip])-1], "Ip mapping is failed")
167
+//
168
+//		// 2. Check topicNode
169
+//		topicPtr := mos.sub_mng.list.head
170
+//		for topicPtr != nil {
171
+//			if Compare(Topic, topicPtr.Topic) == 0 {
172
+//				break
173
+//			}
174
+//			topicPtr = topicPtr.next
175
+//		}
176
+//
177
+//		assert.Equal(t, Topic, topicPtr.Topic, "topicNode Add is failed")
178
+//
179
+//		// Check isSingle ?
180
+//		if len(Operator) == 3 && Operator[1] == "&&" {
181
+//			isSingle = false
182
+//		}
183
+//
184
+//		// 3. Check Value in ValueNode & Check Operator in ValueNode
185
+//		if !isSingle || (len(Operator) == 3 && Operator[1] == "||") {
186
+//			valptr1 := topicPtr.list.getValueNodePos([]int64{Value[0]})
187
+//			valptr2 := topicPtr.list.getValueNodePos([]int64{Value[1]})
188
+//
189
+//			assert.Equal(t, []int64{Value[0]}, valptr1.val)
190
+//			assert.Equal(t, []int64{Value[1]}, valptr2.val)
191
+//
192
+//			assert.NotEqual(t, -1, checkOperatorList(isSingle, subnumber, Operator[0], valptr1))
193
+//			assert.NotEqual(t, -1, checkOperatorList(isSingle, subnumber, Operator[2], valptr2))
194
+//
195
+//		} else {
196
+//			valptr := topicPtr.list.getValueNodePos(Value)
197
+//			assert.Equal(t, Value, valptr.val)
198
+//			assert.NotEqual(t, -1, checkOperatorList(isSingle, subnumber, Operator[0], valptr))
199
+//		}
200
+//	}
201
+//}
202
+//
203
+//// Test addSubScription(2) (same [Topic, Value] dif [Operator])
204
+//func Test_addSubscription_same_topicNvalue(t *testing.T) {
205
+//	rand.Seed(time.Now().UnixNano())
206
+//
207
+//	// To Init sub_mng
208
+//	mos := Moscato{sub_mng: *newSubmng()}
209
+//
210
+//	// Fix Topic & Value
211
+//	topicLen := rand.Int()%10 + 1
212
+//	staticTopic := make([]int64, topicLen)
213
+//	staticValue := []int64{rand.Int63()}
214
+//
215
+//
216
+//	// Make Data
217
+//	var msgList []MsgUnit
218
+//	dataLen := 100
219
+//	msgList = makeMsgList(dataLen, false)
220
+//
221
+//	for i := 0; i < topicLen; i++ {
222
+//		staticTopic[i] = rand.Int63()
223
+//	}
224
+//
225
+//	// Fix Same Topic & Value
226
+//	for i := 0; i < dataLen; i++ {
227
+//		Operator := msgList[i].(*SubscriptionMsg).Operator
228
+//		msgList[i].(*SubscriptionMsg).Topic = staticTopic
229
+//
230
+//		if len(Operator) == 1{
231
+//			msgList[i].(*SubscriptionMsg).Value = staticValue
232
+//		}
233
+//	}
234
+//
235
+//	// Watch Data set
236
+//	//watchData(msgList, dataLen, true)
237
+//
238
+//	for i := 0; i < dataLen; i++ {
239
+//		msg := msgList[i]
240
+//		err := mos.sub_mng.addSubscription(msg)
241
+//		assert.Equal(t, nil, err)
242
+//	}
243
+//
244
+//	topicPtr := mos.sub_mng.list.head
245
+//	for topicPtr != nil {
246
+//		if Compare(topicPtr.Topic, staticTopic) == 0 {
247
+//			break
248
+//		}
249
+//		topicPtr = topicPtr.next
250
+//	}
251
+//	watchValueNode(topicPtr.list.head)
252
+//
253
+//}
254
+//
255
+//func watchValueNode(ptr *valueNode) {
256
+//	valPtr := ptr
257
+//	for valPtr != nil{
258
+//		fmt.Println("Value = ", valPtr.val)
259
+//		if len(valPtr.single2sub_s) != 0 {
260
+//			fmt.Println("Single2sub (<) List = ", valPtr.single2sub_s)
261
+//		}
262
+//		if len(valPtr.single2sub_es) != 0 {
263
+//			fmt.Println("Single2sub (<=) List = ", valPtr.single2sub_es)
264
+//		}
265
+//		if len(valPtr.single2sub_b) != 0 {
266
+//			fmt.Println("Single2sub (>) List = ", valPtr.single2sub_b)
267
+//		}
268
+//		if len(valPtr.single2sub_eb) != 0 {
269
+//			fmt.Println("Single2sub (>=) List = ", valPtr.single2sub_eb)
270
+//		}
271
+//		if len(valPtr.single2sub_e) != 0 {
272
+//			fmt.Println("Single2sub (==) List = ", valPtr.single2sub_e)
273
+//		}
274
+//
275
+//		if len(valPtr.range2sub_s) != 0 {
276
+//			fmt.Println("range2sub (<) List = ", valPtr.range2sub_s)
277
+//		}
278
+//		if len(valPtr.range2sub_es) != 0 {
279
+//			fmt.Println("range2sub (<=) List = ", valPtr.range2sub_es)
280
+//		}
281
+//		if len(valPtr.range2sub_b) != 0 {
282
+//			fmt.Println("range2sub (>) List = ", valPtr.range2sub_b)
283
+//		}
284
+//		if len(valPtr.range2sub_eb) != 0 {
285
+//			fmt.Println("range2sub (>=) List = ", valPtr.range2sub_eb)
286
+//		}
287
+//		valPtr = valPtr.next
288
+//	}
289
+//}

+ 1
- 0
src/github.com/apex/log

@@ -0,0 +1 @@
1
+Subproject commit 8da83152b5d6177b4bfe3d12810a5afd25355170

+ 1
- 0
src/github.com/benbjohnson/clock

@@ -0,0 +1 @@
1
+Subproject commit 68df829297d4f02d5997a02f8c0e8d4b12f0b2a3

+ 1
- 0
src/github.com/bmizerany/assert

@@ -0,0 +1 @@
1
+Subproject commit b7ed37b82869576c289d7d97fb2bbd8b64a0cb28

+ 1
- 0
src/github.com/davecgh/go-spew

@@ -0,0 +1 @@
1
+Subproject commit d8f796af33cc11cb798c1aaeb27a4ebc5099927d

+ 1
- 0
src/github.com/facebookgo/ensure

@@ -0,0 +1 @@
1
+Subproject commit 63f1cf65ac4cf1f4bd7e93292149a456001b0e0b

+ 1
- 0
src/github.com/facebookgo/inject

@@ -0,0 +1 @@
1
+Subproject commit f23751cae28bef101a38b79266785d3b574cf7e8

+ 1
- 0
src/github.com/facebookgo/stack

@@ -0,0 +1 @@
1
+Subproject commit 751773369052141c013c6e827a71e8f35c07879c

+ 1
- 0
src/github.com/facebookgo/structtag

@@ -0,0 +1 @@
1
+Subproject commit 217e25fb96916cc60332e399c9aa63f5c422ceed

+ 1
- 0
src/github.com/facebookgo/subset

@@ -0,0 +1 @@
1
+Subproject commit c811ad88dec4edb3d7af0a88b34e6865d7460ba2

+ 1
- 0
src/github.com/go-kit/kit

@@ -0,0 +1 @@
1
+Subproject commit b34471ec558cb53ef29ca37cb4f93525d8fd6136

+ 1
- 0
src/github.com/go-kit/log

@@ -0,0 +1 @@
1
+Subproject commit 71fa7d7d64785c98d3814e53b3544fd8cccff897

+ 1
- 0
src/github.com/go-logfmt/logfmt

@@ -0,0 +1 @@
1
+Subproject commit 2fe45f2cf057d707c50bc1949e25ec2cd6b7a015

+ 1
- 0
src/github.com/go-stack/stack

@@ -0,0 +1 @@
1
+Subproject commit 93c7c7e3550c72bc91dead1452a0020142e2a902

+ 1
- 0
src/github.com/juliangruber/go-intersect

@@ -0,0 +1 @@
1
+Subproject commit 2e99d8c0a75f6975a52f7efeb81926a19b221214

+ 1
- 0
src/github.com/kr/pretty

@@ -0,0 +1 @@
1
+Subproject commit 0c671bcf5d4de20095c1ced9e581466bb5cc035d

+ 1
- 0
src/github.com/kr/text

@@ -0,0 +1 @@
1
+Subproject commit a988ae8c4d852fa9930b96d707cc0fc6b43c0787

+ 1
- 0
src/github.com/mattn/go-colorable

@@ -0,0 +1 @@
1
+Subproject commit e1bb79c8d53c38a60962ad4b8f658226cc983710

+ 1
- 0
src/github.com/mattn/go-isatty

@@ -0,0 +1 @@
1
+Subproject commit cdb00f17c375f76e4d01572d16de4eb14b48576a

+ 1
- 0
src/github.com/pkg/errors

@@ -0,0 +1 @@
1
+Subproject commit 5dd12d0cfe7f152f80558d591504ce685299311e

+ 1
- 0
src/github.com/pmezard/go-difflib

@@ -0,0 +1 @@
1
+Subproject commit 5d4384ee4fb2527b0a1256a821ebfc92f91efefc

+ 1
- 0
src/github.com/rogpeppe/go-internal

@@ -0,0 +1 @@
1
+Subproject commit 115ce09fd6b421993aafa8cdae0171429e9bd2c4

+ 1
- 0
src/github.com/rs/zerolog

@@ -0,0 +1 @@
1
+Subproject commit c0c2e11fc3cd04ae28d856789cf58ffd1666bc3f

+ 1
- 0
src/github.com/sirupsen/logrus

@@ -0,0 +1 @@
1
+Subproject commit 79c5ab66aa2ce7d9ff7b3c437ebc22fcc519a967

+ 1
- 0
src/github.com/stretchr/testify

@@ -0,0 +1 @@
1
+Subproject commit ab6dc3262822ed562480c19876b0257ace761e3e

+ 1
- 0
src/go.uber.org/atomic

@@ -0,0 +1 @@
1
+Subproject commit 3504dfaa1fa414923b1c8693f45d2f6931daf229

+ 1
- 0
src/go.uber.org/goleak

@@ -0,0 +1 @@
1
+Subproject commit 421b885faf413a221d1a86a17684fb3c45bd35c5

+ 1
- 0
src/go.uber.org/multierr

@@ -0,0 +1 @@
1
+Subproject commit d49c2ba57443d9ddb09d5716b7c971f8ecbdce24

+ 1
- 0
src/go.uber.org/zap

@@ -0,0 +1 @@
1
+Subproject commit 369c1bdf2a994a436ea89247432544ad7184020b

+ 1
- 0
src/golang.org/x/sys

@@ -0,0 +1 @@
1
+Subproject commit 1d35b9e2eb4edf581781c7f3e2a36fac701f0a24

+ 1
- 0
src/google.golang.org/grpc

@@ -0,0 +1 @@
1
+Subproject commit fbaf7c55821070944bb0ce342ba3c54cc521c6fe

+ 1
- 0
src/gopkg.in/inconshreveable/log15.v2

@@ -0,0 +1 @@
1
+Subproject commit b30bc20e4fd12cec79a9aae62e91cfcf458bd253

+ 1
- 0
src/gopkg.in/yaml.v2

@@ -0,0 +1 @@
1
+Subproject commit 7649d4548cb53a614db133b2a8ac1f31859dda8c

+ 1
- 0
src/gopkg.in/yaml.v3

@@ -0,0 +1 @@
1
+Subproject commit 496545a6307b2a7d7a710fd516e5e16e8ab62dbc

+ 362826
- 0
test.log
Diff onderdrukt omdat het te groot bestand
Bestand weergeven


Laden…
Annuleren
Opslaan