jaehoon_kim 4 лет назад
Родитель
Сommit
85f6c29e5b
2 измененных файлов: 89 добавлений и 78 удалений
  1. 74
    70
      src/broker/modules/init.go
  2. 15
    8
      src/broker/modules/manage.go

+ 74
- 70
src/broker/modules/init.go Просмотреть файл

1
 package modules
1
 package modules
2
 
2
 
3
 import (
3
 import (
4
+	"broker/modules/github.com/fatih/color"
4
 	"encoding/json"
5
 	"encoding/json"
5
 	"errors"
6
 	"errors"
6
 	"fmt"
7
 	"fmt"
7
-	"github.com/fatih/color"
8
 	"log"
8
 	"log"
9
 	"net"
9
 	"net"
10
 	"net/rpc"
10
 	"net/rpc"
13
 // TODO: RM 처리와 init.go 전반적으로 DI 이용하여 구현 RM 키등록 부분 구현 예정
13
 // TODO: RM 처리와 init.go 전반적으로 DI 이용하여 구현 RM 키등록 부분 구현 예정
14
 
14
 
15
 type Moscato struct {
15
 type Moscato struct {
16
-	queue               MsgQueue
17
-	SendQueue           chan myType
16
+	queue               MsgQueue //메세지 받는 큐
17
+	SendQueue           chan myType //Publish Message가 들어왔을때 매칭이 된 결과물이 담김
18
 	MicroServiceManager NodeManager `inject:""`
18
 	MicroServiceManager NodeManager `inject:""`
19
 	MatchingManager     match_manager
19
 	MatchingManager     match_manager
20
 	SubscriptionManager sub_manager
20
 	SubscriptionManager sub_manager
22
 }
22
 }
23
 
23
 
24
 type myType struct {
24
 type myType struct {
25
-	subList []string
26
-	pubMsg  MsgUnit
25
+	subList []string //sub의 ip주소를 가지고 있음
26
+	pubMsg  MsgUnit //Publish Message
27
 	err     error
27
 	err     error
28
 }
28
 }
29
 
29
 
36
 }
36
 }
37
 
37
 
38
 type Args struct { // 매개변수
38
 type Args struct { // 매개변수
39
-	JsonMsg []byte
40
-	Kind    int
39
+	JsonMsg []byte //JSON 타입으로 바뀐 메세지
40
+	Kind    int //메세지 종류
41
 }
41
 }
42
 
42
 
43
 /*
43
 /*
44
+MM:MessageMiddleWare
45
+MS:MicroService
44
 MS→MM
46
 MS→MM
45
 
47
 
46
 -MM 실행되면 MM서버는 열려있음(MS은 자동으로 Client)(포트 8150)
48
 -MM 실행되면 MM서버는 열려있음(MS은 자동으로 Client)(포트 8150)
50
 -MmReceive에서 msgType 검사 후 그것에 맞게 msgUnit으로 MM의 Receive로 보냄
52
 -MmReceive에서 msgType 검사 후 그것에 맞게 msgUnit으로 MM의 Receive로 보냄
51
 
53
 
52
 -MM의 Receive에서 해당 Message를 처리
54
 -MM의 Receive에서 해당 Message를 처리
53
-*/
55
+ */
54
 
56
 
55
-func (receiver Receiver) MmReceive(args Args, reply *Reply) error { //
56
-	// 메세지 별로 나눠서 언마샬
57
+func (receiver Receiver) MmReceive(args Args, reply *Reply) error {
58
+	// 메세지 타입별로 나눠서 언마샬
57
 	switch args.Kind {
59
 	switch args.Kind {
58
 
60
 
59
-	case KSM:
61
+	case KSM://KeyShareMessage
60
 		var msg KeyShareMsg
62
 		var msg KeyShareMsg
61
 		err := json.Unmarshal(args.JsonMsg, &msg)
63
 		err := json.Unmarshal(args.JsonMsg, &msg)
62
 		if err != nil {
64
 		if err != nil {
69
 			}
71
 			}
70
 		}()
72
 		}()
71
 		reply.CompleteLog = "received"
73
 		reply.CompleteLog = "received"
72
-	case PM:
74
+	case PM://PublishMessage
73
 		var msg PublishMsg
75
 		var msg PublishMsg
74
 		err := json.Unmarshal(args.JsonMsg, &msg)
76
 		err := json.Unmarshal(args.JsonMsg, &msg)
75
 		if err != nil {
77
 		if err != nil {
83
 			}
85
 			}
84
 		}()
86
 		}()
85
 		reply.CompleteLog = "PM received"
87
 		reply.CompleteLog = "PM received"
86
-	case SM:
88
+	case SM://SubscriptionMessage
87
 		var msg SubscriptionMsg
89
 		var msg SubscriptionMsg
88
 		err := json.Unmarshal(args.JsonMsg, &msg)
90
 		err := json.Unmarshal(args.JsonMsg, &msg)
89
 		if err != nil {
91
 		if err != nil {
96
 			}
98
 			}
97
 		}()
99
 		}()
98
 		reply.CompleteLog = "SM received"
100
 		reply.CompleteLog = "SM received"
99
-	case RM:
101
+	case RM://RegisterMessage
100
 		var msg RegisterMsg
102
 		var msg RegisterMsg
101
 		err := json.Unmarshal(args.JsonMsg, &msg)
103
 		err := json.Unmarshal(args.JsonMsg, &msg)
102
 		if err != nil {
104
 		if err != nil {
109
 			}
111
 			}
110
 		}()
112
 		}()
111
 		reply.CompleteLog = "RM received"
113
 		reply.CompleteLog = "RM received"
112
-	case WM:
114
+	case WM://WithdrawMessage
113
 		var msg WithdrawMsg
115
 		var msg WithdrawMsg
114
 		err := json.Unmarshal(args.JsonMsg, &msg)
116
 		err := json.Unmarshal(args.JsonMsg, &msg)
115
 		if err != nil {
117
 		if err != nil {
122
 			}
124
 			}
123
 		}()
125
 		}()
124
 		reply.CompleteLog = "WM received"
126
 		reply.CompleteLog = "WM received"
125
-	default:
127
+	default://MM이 받을 타입의 메세지가 아닌 경우
128
+		reply.CompleteLog="This type is inappropriate"
126
 		return errors.New("message type Error: Not registered message type")
129
 		return errors.New("message type Error: Not registered message type")
127
 	}
130
 	}
128
 	//reply.CompleteLog = "received completely"
131
 	//reply.CompleteLog = "received completely"
129
 	return nil
132
 	return nil
130
 }
133
 }
131
-
132
-func (moscato *Moscato) preProcessMsg(originalMsg MsgUnit) MsgUnit {
133
-	if originalMsg.CheckType() == PM {
134
-		pubMsg := originalMsg.(PublishMsg)
135
-		for index := 0; index < len(pubMsg.Topic); index++ {
136
-			pubMsg.Topic[index] = pubMsg.Topic[index] - moscato.SecureManager.GetNodeKey(pubMsg.From)
137
-		}
138
-		for index := 0; index < len(pubMsg.Value); index++ {
139
-			pubMsg.Value[index] = pubMsg.Value[index] - moscato.SecureManager.GetNodeKey(pubMsg.From)
140
-		}
141
-		return pubMsg
142
-	} else if originalMsg.CheckType() == SM {
143
-		subMsg := originalMsg.(SubscriptionMsg)
144
-		for index := 0; index < len(subMsg.Topic); index++ {
145
-			subMsg.Topic[index] = subMsg.Topic[index] - moscato.SecureManager.GetNodeKey(subMsg.From)
146
-		}
147
-		for index := 0; index < len(subMsg.Value); index++ {
148
-			subMsg.Value[index] = subMsg.Value[index] - moscato.SecureManager.GetNodeKey(subMsg.From)
149
-		}
150
-		return subMsg
151
-	}
152
-	return nil
153
-}
154
-
155
-func (moscato *Moscato) SendWithEncrypt() MsgUnit {
156
-	for {
157
-		mt := <-moscato.SendQueue
158
-		//fmt.Println(mt)
159
-		if mt.err == nil {
160
-			for index := 0; index < len(mt.subList); index++ {
161
-				tmpNode := mt.subList[index]
162
-				tmpNodeIpAddr, _ := moscato.MicroServiceManager.GetIpaddr(tmpNode)
163
-				//moscato.SecureManager.ReEncPubMsg(mt.pubMsg.(PublishMsg), tmpNode)
164
-				//fmt.Println("publish: ", mt.pubMsg)
165
-				//moscato.Send2MS(tmpNodeIpAddr, mt.pubMsg)
166
-				moscato.Send2MS(tmpNodeIpAddr, moscato.SecureManager.ReEncPubMsg(mt.pubMsg.(PublishMsg), tmpNode))
167
-			}
168
-		}
169
-		return nil
170
-	}
171
-}
172
-
173
-//Recieve - MM가 msg전달 받음
134
+//Recieve - MM가 MS로부터 메세지 전달받음
174
 func (moscato *Moscato) Receive(msg MsgUnit) (MsgUnit, error) {
135
 func (moscato *Moscato) Receive(msg MsgUnit) (MsgUnit, error) {
175
-	//rpc call
176
 	var msg_type = msg.CheckType()
136
 	var msg_type = msg.CheckType()
177
 	//메세지 타입에 따라 다르게 처리
137
 	//메세지 타입에 따라 다르게 처리
178
 	switch msg_type {
138
 	switch msg_type {
179
 
139
 
180
-	case KSM: //Key share msg
181
-		
182
-	case PM: //Publish msg
140
+	case KSM: //KeyShareMessage - 추후 구현
141
+
142
+	case PM: //PublishMessage
183
 		log.Println("PM received")
143
 		log.Println("PM received")
184
 		moscato.queue.push(moscato.preProcessMsg(msg))
144
 		moscato.queue.push(moscato.preProcessMsg(msg))
185
 
145
 
186
-	case SM: //Subscription msg
146
+	case SM: //SubscriptionMessage
187
 		log.Println("SM received")
147
 		log.Println("SM received")
188
 		err := moscato.SubscriptionManager.addSubscription(moscato.preProcessMsg(msg))
148
 		err := moscato.SubscriptionManager.addSubscription(moscato.preProcessMsg(msg))
189
 		if err != nil {
149
 		if err != nil {
190
 			println(err)
150
 			println(err)
191
-			//return nil, err
192
 		}
151
 		}
193
 
152
 
194
-	case RM: //Register msg
153
+	case RM: //RegisterMessage
195
 		log.Println("RM received")
154
 		log.Println("RM received")
196
 		var newMsg RegisterMsg
155
 		var newMsg RegisterMsg
197
 		newMsg = msg.(RegisterMsg)
156
 		newMsg = msg.(RegisterMsg)
217
 		ip := msg.(WithdrawMsg).From
176
 		ip := msg.(WithdrawMsg).From
218
 		moscato.MicroServiceManager.RemoveMicroservice(msg.(WithdrawMsg).From)
177
 		moscato.MicroServiceManager.RemoveMicroservice(msg.(WithdrawMsg).From)
219
 		moscato.SecureManager.RemoveSecureKey(msg.(WithdrawMsg).From)
178
 		moscato.SecureManager.RemoveSecureKey(msg.(WithdrawMsg).From)
220
-		moscato.SubscriptionManager.delete(ip)
179
+		moscato.SubscriptionManager.delete(ip) //해당 Ip주소를 가진 Subscription 정보 삭제
221
 
180
 
222
-	default:
181
+	default://MM이 받을 메세지 타입이 아닌경우
182
+		log.Println("Message type Error: Not registered message type")
223
 		return nil, errors.New("Message type Error: Not registered message type")
183
 		return nil, errors.New("Message type Error: Not registered message type")
224
 	}
184
 	}
225
 
185
 
264
 	log.Println(reply.CompleteLog)
224
 	log.Println(reply.CompleteLog)
265
 }
225
 }
266
 
226
 
227
+//Matching을 용이하게 하기위한 메세지 가공 과정
228
+func (moscato *Moscato) preProcessMsg(originalMsg MsgUnit) MsgUnit {
229
+	if originalMsg.CheckType() == PM {
230
+		pubMsg := originalMsg.(PublishMsg)
231
+		for index := 0; index < len(pubMsg.Topic); index++ {
232
+			pubMsg.Topic[index] = pubMsg.Topic[index] - moscato.SecureManager.GetNodeKey(pubMsg.From)
233
+		}
234
+		for index := 0; index < len(pubMsg.Value); index++ {
235
+			pubMsg.Value[index] = pubMsg.Value[index] - moscato.SecureManager.GetNodeKey(pubMsg.From)
236
+		}
237
+		return pubMsg
238
+	} else if originalMsg.CheckType() == SM {
239
+		subMsg := originalMsg.(SubscriptionMsg)
240
+		for index := 0; index < len(subMsg.Topic); index++ {
241
+			subMsg.Topic[index] = subMsg.Topic[index] - moscato.SecureManager.GetNodeKey(subMsg.From)
242
+		}
243
+		for index := 0; index < len(subMsg.Value); index++ {
244
+			subMsg.Value[index] = subMsg.Value[index] - moscato.SecureManager.GetNodeKey(subMsg.From)
245
+		}
246
+		return subMsg
247
+	}
248
+	return nil
249
+}
250
+
251
+//암호화 해서 보내기
252
+func (moscato *Moscato) SendWithEncrypt() MsgUnit {
253
+	for {
254
+		mt := <-moscato.SendQueue
255
+		if mt.err == nil {
256
+			for index := 0; index < len(mt.subList); index++ { //sublist들을 돌면서 매세지를 encrypt하여 메세지 보냄
257
+				tmpNode := mt.subList[index]
258
+				tmpNodeIpAddr, _ := moscato.MicroServiceManager.GetIpaddr(tmpNode)
259
+				//moscato.SecureManager.ReEncPubMsg(mt.pubMsg.(PublishMsg), tmpNode)
260
+				//fmt.Println("publish: ", mt.pubMsg)
261
+				//moscato.Send2MS(tmpNodeIpAddr, mt.pubMsg)
262
+				moscato.Send2MS(tmpNodeIpAddr, moscato.SecureManager.ReEncPubMsg(mt.pubMsg.(PublishMsg), tmpNode))
263
+			}
264
+		}
265
+		return nil
266
+	}
267
+}
268
+
267
 func (moscato *Moscato) Run() {
269
 func (moscato *Moscato) Run() {
268
 
270
 
269
 	config := AppConfig{moscato}
271
 	config := AppConfig{moscato}
284
 	go func() {
286
 	go func() {
285
 		for {
287
 		for {
286
 			msg := moscato.queue.pop(true)
288
 			msg := moscato.queue.pop(true)
289
+			fmt.Println(msg)
287
 			go moscato.Matching(msg)
290
 			go moscato.Matching(msg)
288
 			go moscato.SendWithEncrypt()
291
 			go moscato.SendWithEncrypt()
289
 		}
292
 		}
300
 
303
 
301
 	go Listen()
304
 	go Listen()
302
 	color.Blue("initializing complete.")
305
 	color.Blue("initializing complete.")
306
+
303
 	fmt.Scanln()
307
 	fmt.Scanln()
304
 }
308
 }
305
 
309
 
306
 func Listen() {
310
 func Listen() {
307
 	/*
311
 	/*
308
-		MS→MM일때 ⇒ port : 8160으로 열기
312
+	MS→MM일때 ⇒ port : 8160으로 열기
309
 
313
 
310
-		(MM이 Server, MS가 Client)
314
+	(MM이 Server, MS가 Client)
311
 	*/
315
 	*/
312
 
316
 
313
 	l, err1 := net.Listen("tcp", ":8160")
317
 	l, err1 := net.Listen("tcp", ":8160")

+ 15
- 8
src/broker/modules/manage.go Просмотреть файл

7
 
7
 
8
 //각 Microservice에 대한 정보 저장 노드
8
 //각 Microservice에 대한 정보 저장 노드
9
 type MSNode struct {
9
 type MSNode struct {
10
-	nodeName string
10
+	nodeName string //Nodename- 현재 데모에서는 IpAddress와 같음
11
 	ipAddr   string
11
 	ipAddr   string
12
 }
12
 }
13
 
13
 
14
+//Nodename 반환
14
 func (node *MSNode) GetName() string {
15
 func (node *MSNode) GetName() string {
15
 	return node.nodeName
16
 	return node.nodeName
16
 }
17
 }
18
+
19
+//IpAddress 반환
17
 func (node *MSNode) GetIpaddr() string {
20
 func (node *MSNode) GetIpaddr() string {
18
 	return node.ipAddr
21
 	return node.ipAddr
19
 }
22
 }
20
 
23
 
21
 type NodeManager interface {
24
 type NodeManager interface {
22
-	GetIpaddr(nodeName string) (string, bool)
23
-	AddMicroservice(node MSNode) bool
24
-	RemoveMicroservice(nodeName string) bool
25
+	GetIpaddr(nodeName string) (string, bool) //IpAddress반환
26
+	AddMicroservice(node MSNode) bool //MS추가
27
+	RemoveMicroservice(nodeName string) bool //MS삭제
25
 }
28
 }
26
 
29
 
27
 //모든 Microservice정보 저장
30
 //모든 Microservice정보 저장
29
 	NodeTable map[string]MSNode
32
 	NodeTable map[string]MSNode
30
 }
33
 }
31
 
34
 
35
+//MStable 생성자
32
 func NewMStable() *MStable {
36
 func NewMStable() *MStable {
33
 	defer fmt.Println("node manager setting complete.")
37
 	defer fmt.Println("node manager setting complete.")
34
 	return &MStable{NodeTable: make(map[string]MSNode)}
38
 	return &MStable{NodeTable: make(map[string]MSNode)}
35
 }
39
 }
36
 
40
 
41
+//IpAddress반환
37
 func (manager *MStable) GetIpaddr(nodeName string) (string, bool) {
42
 func (manager *MStable) GetIpaddr(nodeName string) (string, bool) {
38
 	//해당 이름의 노드이름이 존재하는지 확인
43
 	//해당 이름의 노드이름이 존재하는지 확인
39
 	node, exists := manager.NodeTable[nodeName]
44
 	node, exists := manager.NodeTable[nodeName]
46
 	}
51
 	}
47
 }
52
 }
48
 
53
 
54
+//MS추가
49
 func (manager *MStable) AddMicroservice(node MSNode) bool {
55
 func (manager *MStable) AddMicroservice(node MSNode) bool {
50
 	//삽입 전 존재여부 확인
56
 	//삽입 전 존재여부 확인
51
-	_, exists := manager.NodeTable[node.GetName()]
57
+	_, exists := manager.NodeTable[node.GetName()] ////해당 Node의 이름이 있는지 검색
52
 
58
 
53
 	if exists {
59
 	if exists {
54
 		return false
60
 		return false
55
-	} else {
61
+	} else { //존재안한다면 추가 (존재할경우 이미 있는것이기 때문에 추가할 필요 없음)
56
 		manager.NodeTable[node.GetName()] = node
62
 		manager.NodeTable[node.GetName()] = node
57
 		return true
63
 		return true
58
 	}
64
 	}
59
 }
65
 }
60
 
66
 
67
+//MS삭제
61
 func (manager *MStable) RemoveMicroservice(nodeName string) bool {
68
 func (manager *MStable) RemoveMicroservice(nodeName string) bool {
62
 	//삭제 전 존재여부 확인
69
 	//삭제 전 존재여부 확인
63
-	_, exists := manager.NodeTable[nodeName]
70
+	_, exists := manager.NodeTable[nodeName] //해당 이름을 가진 Node가 있는지 검색
64
 
71
 
65
 	if exists {
72
 	if exists {
66
-		delete(manager.NodeTable, nodeName)
73
+		delete(manager.NodeTable, nodeName) //존재한다면 삭제
67
 		log.Println("[" + nodeName + "] : service quit")
74
 		log.Println("[" + nodeName + "] : service quit")
68
 		return true
75
 		return true
69
 	} else {
76
 	} else {

Загрузка…
Отмена
Сохранить