|
|
@@ -1,11 +1,11 @@
|
|
1
|
1
|
package modules
|
|
2
|
2
|
|
|
3
|
3
|
import (
|
|
4
|
|
- "broker/modules/github.com/fatih/color"
|
|
5
|
4
|
"encoding/json"
|
|
6
|
5
|
"errors"
|
|
7
|
6
|
"fmt"
|
|
8
|
|
- "log"
|
|
|
7
|
+ "github.com/fatih/color"
|
|
|
8
|
+ l4g "github.com/jeanphorn/log4go"
|
|
9
|
9
|
"net"
|
|
10
|
10
|
"net/rpc"
|
|
11
|
11
|
)
|
|
|
@@ -13,8 +13,8 @@ import (
|
|
13
|
13
|
// TODO: RM 처리와 init.go 전반적으로 DI 이용하여 구현 RM 키등록 부분 구현 예정
|
|
14
|
14
|
|
|
15
|
15
|
type Moscato struct {
|
|
16
|
|
- queue MsgQueue //메세지 받는 큐
|
|
17
|
|
- SendQueue chan myType //Publish Message가 들어왔을때 매칭이 된 결과물이 담김
|
|
|
16
|
+ queue MsgQueue
|
|
|
17
|
+ SendQueue chan myType
|
|
18
|
18
|
MicroServiceManager NodeManager `inject:""`
|
|
19
|
19
|
MatchingManager match_manager
|
|
20
|
20
|
SubscriptionManager sub_manager
|
|
|
@@ -22,8 +22,8 @@ type Moscato struct {
|
|
22
|
22
|
}
|
|
23
|
23
|
|
|
24
|
24
|
type myType struct {
|
|
25
|
|
- subList []string //sub의 ip주소를 가지고 있음
|
|
26
|
|
- pubMsg MsgUnit //Publish Message
|
|
|
25
|
+ subList []string
|
|
|
26
|
+ pubMsg MsgUnit
|
|
27
|
27
|
err error
|
|
28
|
28
|
}
|
|
29
|
29
|
|
|
|
@@ -36,13 +36,11 @@ type Receiver struct { //RPC 서버에 등록하기 위한 변수
|
|
36
|
36
|
}
|
|
37
|
37
|
|
|
38
|
38
|
type Args struct { // 매개변수
|
|
39
|
|
- JsonMsg []byte //JSON 타입으로 바뀐 메세지
|
|
40
|
|
- Kind int //메세지 종류
|
|
|
39
|
+ JsonMsg []byte
|
|
|
40
|
+ Kind int
|
|
41
|
41
|
}
|
|
42
|
42
|
|
|
43
|
43
|
/*
|
|
44
|
|
-MM:MessageMiddleWare
|
|
45
|
|
-MS:MicroService
|
|
46
|
44
|
MS→MM
|
|
47
|
45
|
|
|
48
|
46
|
-MM 실행되면 MM서버는 열려있음(MS은 자동으로 Client)(포트 8150)
|
|
|
@@ -52,13 +50,13 @@ MS→MM
|
|
52
|
50
|
-MmReceive에서 msgType 검사 후 그것에 맞게 msgUnit으로 MM의 Receive로 보냄
|
|
53
|
51
|
|
|
54
|
52
|
-MM의 Receive에서 해당 Message를 처리
|
|
55
|
|
- */
|
|
|
53
|
+*/
|
|
56
|
54
|
|
|
57
|
|
-func (receiver Receiver) MmReceive(args Args, reply *Reply) error {
|
|
58
|
|
- // 메세지 타입별로 나눠서 언마샬
|
|
|
55
|
+func (receiver Receiver) MmReceive(args Args, reply *Reply) error { //
|
|
|
56
|
+ // 메세지 별로 나눠서 언마샬
|
|
59
|
57
|
switch args.Kind {
|
|
60
|
58
|
|
|
61
|
|
- case KSM://KeyShareMessage
|
|
|
59
|
+ case KSM:
|
|
62
|
60
|
var msg KeyShareMsg
|
|
63
|
61
|
err := json.Unmarshal(args.JsonMsg, &msg)
|
|
64
|
62
|
if err != nil {
|
|
|
@@ -71,7 +69,7 @@ func (receiver Receiver) MmReceive(args Args, reply *Reply) error {
|
|
71
|
69
|
}
|
|
72
|
70
|
}()
|
|
73
|
71
|
reply.CompleteLog = "received"
|
|
74
|
|
- case PM://PublishMessage
|
|
|
72
|
+ case PM:
|
|
75
|
73
|
var msg PublishMsg
|
|
76
|
74
|
err := json.Unmarshal(args.JsonMsg, &msg)
|
|
77
|
75
|
if err != nil {
|
|
|
@@ -85,7 +83,7 @@ func (receiver Receiver) MmReceive(args Args, reply *Reply) error {
|
|
85
|
83
|
}
|
|
86
|
84
|
}()
|
|
87
|
85
|
reply.CompleteLog = "PM received"
|
|
88
|
|
- case SM://SubscriptionMessage
|
|
|
86
|
+ case SM:
|
|
89
|
87
|
var msg SubscriptionMsg
|
|
90
|
88
|
err := json.Unmarshal(args.JsonMsg, &msg)
|
|
91
|
89
|
if err != nil {
|
|
|
@@ -98,7 +96,7 @@ func (receiver Receiver) MmReceive(args Args, reply *Reply) error {
|
|
98
|
96
|
}
|
|
99
|
97
|
}()
|
|
100
|
98
|
reply.CompleteLog = "SM received"
|
|
101
|
|
- case RM://RegisterMessage
|
|
|
99
|
+ case RM:
|
|
102
|
100
|
var msg RegisterMsg
|
|
103
|
101
|
err := json.Unmarshal(args.JsonMsg, &msg)
|
|
104
|
102
|
if err != nil {
|
|
|
@@ -111,7 +109,7 @@ func (receiver Receiver) MmReceive(args Args, reply *Reply) error {
|
|
111
|
109
|
}
|
|
112
|
110
|
}()
|
|
113
|
111
|
reply.CompleteLog = "RM received"
|
|
114
|
|
- case WM://WithdrawMessage
|
|
|
112
|
+ case WM:
|
|
115
|
113
|
var msg WithdrawMsg
|
|
116
|
114
|
err := json.Unmarshal(args.JsonMsg, &msg)
|
|
117
|
115
|
if err != nil {
|
|
|
@@ -124,8 +122,7 @@ func (receiver Receiver) MmReceive(args Args, reply *Reply) error {
|
|
124
|
122
|
}
|
|
125
|
123
|
}()
|
|
126
|
124
|
reply.CompleteLog = "WM received"
|
|
127
|
|
- default://MM이 받을 타입의 메세지가 아닌 경우
|
|
128
|
|
- reply.CompleteLog="This type is inappropriate"
|
|
|
125
|
+ default:
|
|
129
|
126
|
return errors.New("message type Error: Not registered message type")
|
|
130
|
127
|
}
|
|
131
|
128
|
//reply.CompleteLog = "received completely"
|
|
|
@@ -133,34 +130,43 @@ func (receiver Receiver) MmReceive(args Args, reply *Reply) error {
|
|
133
|
130
|
}
|
|
134
|
131
|
//Recieve - MM가 MS로부터 메세지 전달받음
|
|
135
|
132
|
func (moscato *Moscato) Receive(msg MsgUnit) (MsgUnit, error) {
|
|
|
133
|
+ l4g.LoadConfiguration("modules/logConfig.json")
|
|
|
134
|
+ //rpc call
|
|
136
|
135
|
var msg_type = msg.CheckType()
|
|
137
|
136
|
//메세지 타입에 따라 다르게 처리
|
|
138
|
137
|
switch msg_type {
|
|
139
|
138
|
|
|
140
|
|
- case KSM: //KeyShareMessage - 추후 구현
|
|
|
139
|
+ case KSM: //Key share msg
|
|
141
|
140
|
|
|
142
|
|
- case PM: //PublishMessage
|
|
143
|
|
- log.Println("PM received")
|
|
|
141
|
+ case PM: //Publish msg
|
|
|
142
|
+ //log.Println("PM received")
|
|
|
143
|
+ l4g.LOGGER("Test").Info("PM received")
|
|
144
|
144
|
moscato.queue.push(moscato.preProcessMsg(msg))
|
|
145
|
145
|
|
|
146
|
|
- case SM: //SubscriptionMessage
|
|
147
|
|
- log.Println("SM received")
|
|
|
146
|
+ case SM: //Subscription msg
|
|
|
147
|
+ //log.Println("SM received")
|
|
|
148
|
+ l4g.LOGGER("Test").Info("SM received")
|
|
148
|
149
|
err := moscato.SubscriptionManager.addSubscription(moscato.preProcessMsg(msg))
|
|
149
|
150
|
if err != nil {
|
|
150
|
151
|
println(err)
|
|
|
152
|
+ //return nil, err
|
|
151
|
153
|
}
|
|
152
|
154
|
|
|
153
|
|
- case RM: //RegisterMessage
|
|
154
|
|
- log.Println("RM received")
|
|
|
155
|
+ case RM: //Register msg
|
|
|
156
|
+ //log.Println("RM received")
|
|
|
157
|
+ l4g.LOGGER("Test").Info("RM received")
|
|
|
158
|
+ l4g.LOGGER("Test").Debug("RM received")
|
|
155
|
159
|
var newMsg RegisterMsg
|
|
156
|
160
|
newMsg = msg.(RegisterMsg)
|
|
157
|
161
|
|
|
158
|
162
|
newNode := MSNode{newMsg.From, newMsg.From}
|
|
159
|
163
|
resultAddNode := moscato.MicroServiceManager.AddMicroservice(newNode)
|
|
160
|
164
|
if resultAddNode {
|
|
161
|
|
- log.Println("Node added successful")
|
|
|
165
|
+ //log.Println("Node added successful")
|
|
|
166
|
+ l4g.LOGGER("Test").Info("Node added successful")
|
|
162
|
167
|
} else {
|
|
163
|
|
- log.Println("Node is already added, ignore RM")
|
|
|
168
|
+ l4g.LOGGER("Test").Error("Node is already added, ignore RM")
|
|
|
169
|
+ //log.Println("Node is already added, ignore RM")
|
|
164
|
170
|
return msg, nil
|
|
165
|
171
|
}
|
|
166
|
172
|
|
|
|
@@ -173,13 +179,16 @@ func (moscato *Moscato) Receive(msg MsgUnit) (MsgUnit, error) {
|
|
173
|
179
|
go moscato.Send2MS(addr, newMsg)
|
|
174
|
180
|
|
|
175
|
181
|
case WM: //Withdraw msg
|
|
176
|
|
- ip := msg.(WithdrawMsg).From
|
|
|
182
|
+ //ip := msg.(WithdrawMsg).From
|
|
|
183
|
+ //sublist := moscato.SubscriptionManager.ip2sub[ip]
|
|
|
184
|
+ //fmt.Println("prev list = ", sublist)
|
|
177
|
185
|
moscato.MicroServiceManager.RemoveMicroservice(msg.(WithdrawMsg).From)
|
|
178
|
186
|
moscato.SecureManager.RemoveSecureKey(msg.(WithdrawMsg).From)
|
|
179
|
|
- moscato.SubscriptionManager.delete(ip) //해당 Ip주소를 가진 Subscription 정보 삭제
|
|
|
187
|
+ //moscato.SubscriptionManager.delete(ip)
|
|
|
188
|
+ //sublist2 := moscato.SubscriptionManager.ip2sub[ip]
|
|
|
189
|
+ //fmt.Println("after list =", sublist2)
|
|
180
|
190
|
|
|
181
|
|
- default://MM이 받을 메세지 타입이 아닌경우
|
|
182
|
|
- log.Println("Message type Error: Not registered message type")
|
|
|
191
|
+ default:
|
|
183
|
192
|
return nil, errors.New("Message type Error: Not registered message type")
|
|
184
|
193
|
}
|
|
185
|
194
|
|
|
|
@@ -201,6 +210,7 @@ MM→MS
|
|
201
|
210
|
*/
|
|
202
|
211
|
|
|
203
|
212
|
func (moscato *Moscato) Send2MS(ipaddress string, msg MsgUnit) {
|
|
|
213
|
+ l4g.LoadConfiguration("logConfig.json")
|
|
204
|
214
|
client, err := rpc.Dial("tcp", ipaddress+":8150")
|
|
205
|
215
|
if err != nil {
|
|
206
|
216
|
fmt.Println(err)
|
|
|
@@ -221,7 +231,8 @@ func (moscato *Moscato) Send2MS(ipaddress string, msg MsgUnit) {
|
|
221
|
231
|
}
|
|
222
|
232
|
//log.Println(reply.CompleteLog) //잘 받았는지 확인 해줌
|
|
223
|
233
|
// 마이크로 서비스에게 받은 메시지는 노란색으로 출력
|
|
224
|
|
- log.Println(reply.CompleteLog)
|
|
|
234
|
+ //log.Println(reply.CompleteLog)
|
|
|
235
|
+ l4g.LOGGER("Test").Info(reply.CompleteLog)
|
|
225
|
236
|
}
|
|
226
|
237
|
|
|
227
|
238
|
//Matching을 용이하게 하기위한 메세지 가공 과정
|
|
|
@@ -286,7 +297,6 @@ func (moscato *Moscato) Run() {
|
|
286
|
297
|
go func() {
|
|
287
|
298
|
for {
|
|
288
|
299
|
msg := moscato.queue.pop(true)
|
|
289
|
|
- fmt.Println(msg)
|
|
290
|
300
|
go moscato.Matching(msg)
|
|
291
|
301
|
go moscato.SendWithEncrypt()
|
|
292
|
302
|
}
|
|
|
@@ -297,7 +307,7 @@ func (moscato *Moscato) Run() {
|
|
297
|
307
|
//rpc 등록 -> Receive 함수
|
|
298
|
308
|
err = rpc.Register(receiver)
|
|
299
|
309
|
if err != nil {
|
|
300
|
|
- log.Println(err)
|
|
|
310
|
+ println(err)
|
|
301
|
311
|
return
|
|
302
|
312
|
}
|
|
303
|
313
|
|
|
|
@@ -317,7 +327,8 @@ func Listen() {
|
|
317
|
327
|
l, err1 := net.Listen("tcp", ":8160")
|
|
318
|
328
|
|
|
319
|
329
|
if err1 != nil {
|
|
320
|
|
- log.Fatal(fmt.Sprintf("Unable to listen on given port: %s", err1))
|
|
|
330
|
+ //log.Fatal(fmt.Sprintf("Unable to listen on given port: %s", err1))
|
|
|
331
|
+ l4g.LOGGER("Test").Critical("Unable to listen on given port: %s", err1)
|
|
321
|
332
|
}
|
|
322
|
333
|
defer l.Close()
|
|
323
|
334
|
|
|
|
@@ -325,4 +336,4 @@ func Listen() {
|
|
325
|
336
|
conn, _ := l.Accept()
|
|
326
|
337
|
go rpc.ServeConn(conn)
|
|
327
|
338
|
}
|
|
328
|
|
-}
|
|
|
339
|
+}
|