|
|
@@ -4,14 +4,14 @@ import (
|
|
4
|
4
|
"encoding/json"
|
|
5
|
5
|
"errors"
|
|
6
|
6
|
"fmt"
|
|
7
|
|
- "github.com/fatih/color"
|
|
8
|
|
- l4g "github.com/jeanphorn/log4go"
|
|
9
|
7
|
"net"
|
|
10
|
8
|
"net/rpc"
|
|
|
9
|
+ "os"
|
|
|
10
|
+ "os/signal"
|
|
|
11
|
+ "strconv"
|
|
|
12
|
+ "syscall"
|
|
11
|
13
|
)
|
|
12
|
14
|
|
|
13
|
|
-// TODO: RM 처리와 init.go 전반적으로 DI 이용하여 구현 RM 키등록 부분 구현 예정
|
|
14
|
|
-
|
|
15
|
15
|
type Moscato struct {
|
|
16
|
16
|
queue MsgQueue
|
|
17
|
17
|
SendQueue chan myType
|
|
|
@@ -68,7 +68,7 @@ func (receiver Receiver) MmReceive(args Args, reply *Reply) error { //
|
|
68
|
68
|
|
|
69
|
69
|
}
|
|
70
|
70
|
}()
|
|
71
|
|
- reply.CompleteLog = "received"
|
|
|
71
|
+ reply.CompleteLog = "KSM received"
|
|
72
|
72
|
case PM:
|
|
73
|
73
|
var msg PublishMsg
|
|
74
|
74
|
err := json.Unmarshal(args.JsonMsg, &msg)
|
|
|
@@ -131,7 +131,7 @@ func (receiver Receiver) MmReceive(args Args, reply *Reply) error { //
|
|
131
|
131
|
|
|
132
|
132
|
//Recieve - MM가 MS로부터 메세지 전달받음
|
|
133
|
133
|
func (moscato *Moscato) Receive(msg MsgUnit) (MsgUnit, error) {
|
|
134
|
|
- var logger = newLogger()
|
|
|
134
|
+ logger := NewMyLogger()
|
|
135
|
135
|
defer logger.Sync()
|
|
136
|
136
|
|
|
137
|
137
|
//rpc call
|
|
|
@@ -143,43 +143,46 @@ func (moscato *Moscato) Receive(msg MsgUnit) (MsgUnit, error) {
|
|
143
|
143
|
|
|
144
|
144
|
case PM: //Publish msg
|
|
145
|
145
|
//log.Println("PM received")
|
|
146
|
|
- logger.Info("PM received")
|
|
|
146
|
+ fromNodeName, _ := moscato.MicroServiceManager.GetIpaddr(msg.(PublishMsg).From)
|
|
|
147
|
+ logger.Info("PM received from:[" + fromNodeName + "]")
|
|
147
|
148
|
moscato.queue.push(moscato.preProcessMsg(msg))
|
|
148
|
149
|
|
|
149
|
150
|
case SM: //Subscription msg
|
|
150
|
151
|
//log.Println("SM received")
|
|
151
|
|
- logger.Info("SM received")
|
|
|
152
|
+ fromNodeName, _ := moscato.MicroServiceManager.GetIpaddr(msg.(SubscriptionMsg).From)
|
|
|
153
|
+ logger.Info("SM received from:[" + fromNodeName + "]")
|
|
152
|
154
|
err := moscato.SubscriptionManager.addSubscription(moscato.preProcessMsg(msg))
|
|
153
|
155
|
if err != nil {
|
|
154
|
|
- println(err)
|
|
|
156
|
+ logger.Warn(err.Error())
|
|
155
|
157
|
//return nil, err
|
|
156
|
158
|
}
|
|
157
|
159
|
|
|
158
|
160
|
case RM: //Register msg
|
|
159
|
|
- logger.Info("RM received")
|
|
160
|
161
|
var newMsg RegisterMsg
|
|
161
|
162
|
newMsg = msg.(RegisterMsg)
|
|
|
163
|
+ logger.Info("RM received from:[" + newMsg.From + "]")
|
|
162
|
164
|
|
|
163
|
165
|
newNode := MSNode{newMsg.From, newMsg.From}
|
|
164
|
166
|
resultAddNode := moscato.MicroServiceManager.AddMicroservice(newNode)
|
|
165
|
167
|
if resultAddNode {
|
|
166
|
|
- //log.Println("Node added successful")
|
|
167
|
|
- l4g.LOGGER("Test").Info("Node added successful")
|
|
|
168
|
+ logger.Info("Node added successful")
|
|
168
|
169
|
} else {
|
|
169
|
|
- l4g.LOGGER("Test").Error("Node is already added, ignore RM")
|
|
|
170
|
+ logger.Error("Node is already added, ignore RM")
|
|
170
|
171
|
//log.Println("Node is already added, ignore RM")
|
|
171
|
172
|
return msg, nil
|
|
172
|
173
|
}
|
|
173
|
174
|
|
|
174
|
175
|
addr, _ := moscato.MicroServiceManager.GetIpaddr(newMsg.From)
|
|
175
|
176
|
moscato.SecureManager.RegKey(newMsg)
|
|
176
|
|
- fmt.Println("Registered microservice: address", addr,
|
|
177
|
|
- "/ key", moscato.SecureManager.GetNodeKey(newMsg.From))
|
|
|
177
|
+ logger.Debug("Registered microservice: address " + addr +
|
|
|
178
|
+ " / key " + strconv.FormatUint(uint64(moscato.SecureManager.GetNodeKey(newMsg.From)), 10))
|
|
178
|
179
|
|
|
179
|
180
|
// ackRM 메세지 전송
|
|
180
|
181
|
go moscato.Send2MS(addr, newMsg)
|
|
181
|
182
|
|
|
182
|
183
|
case WM: //Withdraw msg
|
|
|
184
|
+ fromNodeName, _ := moscato.MicroServiceManager.GetIpaddr(msg.(RegisterMsg).From)
|
|
|
185
|
+ logger.Info("WM received from:[" + fromNodeName + "]")
|
|
183
|
186
|
//ip := msg.(WithdrawMsg).From
|
|
184
|
187
|
//sublist := moscato.SubscriptionManager.ip2sub[ip]
|
|
185
|
188
|
//fmt.Println("prev list = ", sublist)
|
|
|
@@ -211,6 +214,9 @@ MM→MS
|
|
211
|
214
|
*/
|
|
212
|
215
|
|
|
213
|
216
|
func (moscato *Moscato) Send2MS(ipaddress string, msg MsgUnit) {
|
|
|
217
|
+ logger := NewMyLogger()
|
|
|
218
|
+ defer logger.Sync()
|
|
|
219
|
+
|
|
214
|
220
|
client, err := rpc.Dial("tcp", ipaddress+":8150")
|
|
215
|
221
|
if err != nil {
|
|
216
|
222
|
fmt.Println(err)
|
|
|
@@ -232,7 +238,7 @@ func (moscato *Moscato) Send2MS(ipaddress string, msg MsgUnit) {
|
|
232
|
238
|
//log.Println(reply.CompleteLog) //잘 받았는지 확인 해줌
|
|
233
|
239
|
// 마이크로 서비스에게 받은 메시지는 노란색으로 출력
|
|
234
|
240
|
//log.Println(reply.CompleteLog)
|
|
235
|
|
- //l4g.LOGGER("Test").Info(reply.CompleteLog)
|
|
|
241
|
+ logger.Debug(reply.CompleteLog)
|
|
236
|
242
|
}
|
|
237
|
243
|
|
|
238
|
244
|
//Matching을 용이하게 하기위한 메세지 가공 과정
|
|
|
@@ -278,10 +284,27 @@ func (moscato *Moscato) SendWithEncrypt() MsgUnit {
|
|
278
|
284
|
}
|
|
279
|
285
|
|
|
280
|
286
|
func (moscato *Moscato) Run() {
|
|
|
287
|
+ logger := NewMyLogger()
|
|
|
288
|
+ defer logger.Sync()
|
|
281
|
289
|
|
|
282
|
290
|
config := AppConfig{moscato}
|
|
283
|
291
|
config.config()
|
|
284
|
292
|
|
|
|
293
|
+ sigs := make(chan os.Signal, 1)
|
|
|
294
|
+ done := make(chan bool, 1)
|
|
|
295
|
+
|
|
|
296
|
+ signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
|
|
297
|
+
|
|
|
298
|
+ go func() {
|
|
|
299
|
+ sig := <-sigs
|
|
|
300
|
+ //withDraw(message, client)
|
|
|
301
|
+ //fmt.Println(sig)
|
|
|
302
|
+ _ = sig
|
|
|
303
|
+ done <- true
|
|
|
304
|
+ logger.Info("terminate Moscato Message Middleware")
|
|
|
305
|
+ os.Exit(0)
|
|
|
306
|
+ }()
|
|
|
307
|
+
|
|
285
|
308
|
//모스카토 구조체 변수 초기화
|
|
286
|
309
|
receiver := Receiver{moscato: moscato}
|
|
287
|
310
|
err := moscato.queue.queue_init()
|
|
|
@@ -312,12 +335,14 @@ func (moscato *Moscato) Run() {
|
|
312
|
335
|
}
|
|
313
|
336
|
|
|
314
|
337
|
go Listen()
|
|
315
|
|
- color.Blue("initializing complete.")
|
|
|
338
|
+ logger.Info("initializing complete")
|
|
316
|
339
|
|
|
317
|
|
- fmt.Scanln()
|
|
|
340
|
+ <-done
|
|
318
|
341
|
}
|
|
319
|
342
|
|
|
320
|
343
|
func Listen() {
|
|
|
344
|
+ logger := NewMyLogger()
|
|
|
345
|
+ defer logger.Sync()
|
|
321
|
346
|
/*
|
|
322
|
347
|
MS→MM일때 ⇒ port : 8160으로 열기
|
|
323
|
348
|
|
|
|
@@ -327,8 +352,7 @@ func Listen() {
|
|
327
|
352
|
l, err1 := net.Listen("tcp", ":8160")
|
|
328
|
353
|
|
|
329
|
354
|
if err1 != nil {
|
|
330
|
|
- //log.Fatal(fmt.Sprintf("Unable to listen on given port: %s", err1))
|
|
331
|
|
- l4g.LOGGER("Test").Critical("Unable to listen on given port: %s", err1)
|
|
|
355
|
+ logger.Fatal("Unable to listen on given port: " + err1.Error())
|
|
332
|
356
|
}
|
|
333
|
357
|
defer l.Close()
|
|
334
|
358
|
|