|
|
@@ -4,14 +4,14 @@ import (
|
|
4
|
4
|
"encoding/json"
|
|
5
|
5
|
"errors"
|
|
6
|
6
|
"fmt"
|
|
7
|
|
- l4g "github.com/jeanphorn/log4go"
|
|
8
|
7
|
"net"
|
|
9
|
8
|
"net/rpc"
|
|
|
9
|
+ "os"
|
|
|
10
|
+ "os/signal"
|
|
10
|
11
|
"strconv"
|
|
|
12
|
+ "syscall"
|
|
11
|
13
|
)
|
|
12
|
14
|
|
|
13
|
|
-// TODO: RM 처리와 init.go 전반적으로 DI 이용하여 구현 RM 키등록 부분 구현 예정
|
|
14
|
|
-
|
|
15
|
15
|
type Moscato struct {
|
|
16
|
16
|
queue MsgQueue
|
|
17
|
17
|
SendQueue chan myType
|
|
|
@@ -68,7 +68,7 @@ func (receiver Receiver) MmReceive(args Args, reply *Reply) error { //
|
|
68
|
68
|
|
|
69
|
69
|
}
|
|
70
|
70
|
}()
|
|
71
|
|
- reply.CompleteLog = "received"
|
|
|
71
|
+ reply.CompleteLog = "KSM received"
|
|
72
|
72
|
case PM:
|
|
73
|
73
|
var msg PublishMsg
|
|
74
|
74
|
err := json.Unmarshal(args.JsonMsg, &msg)
|
|
|
@@ -143,22 +143,24 @@ func (moscato *Moscato) Receive(msg MsgUnit) (MsgUnit, error) {
|
|
143
|
143
|
|
|
144
|
144
|
case PM: //Publish msg
|
|
145
|
145
|
//log.Println("PM received")
|
|
146
|
|
- logger.Info("PM received")
|
|
|
146
|
+ fromNodeName, _ := moscato.MicroServiceManager.GetIpaddr(msg.(PublishMsg).From)
|
|
|
147
|
+ logger.Info("PM received from:[" + fromNodeName + "]")
|
|
147
|
148
|
moscato.queue.push(moscato.preProcessMsg(msg))
|
|
148
|
149
|
|
|
149
|
150
|
case SM: //Subscription msg
|
|
150
|
151
|
//log.Println("SM received")
|
|
151
|
|
- logger.Info("SM received")
|
|
|
152
|
+ fromNodeName, _ := moscato.MicroServiceManager.GetIpaddr(msg.(SubscriptionMsg).From)
|
|
|
153
|
+ logger.Info("SM received from:[" + fromNodeName + "]")
|
|
152
|
154
|
err := moscato.SubscriptionManager.addSubscription(moscato.preProcessMsg(msg))
|
|
153
|
155
|
if err != nil {
|
|
154
|
|
- println(err)
|
|
|
156
|
+ logger.Warn(err.Error())
|
|
155
|
157
|
//return nil, err
|
|
156
|
158
|
}
|
|
157
|
159
|
|
|
158
|
160
|
case RM: //Register msg
|
|
159
|
|
- logger.Info("RM received")
|
|
160
|
161
|
var newMsg RegisterMsg
|
|
161
|
162
|
newMsg = msg.(RegisterMsg)
|
|
|
163
|
+ logger.Info("RM received from:[" + newMsg.From + "]")
|
|
162
|
164
|
|
|
163
|
165
|
newNode := MSNode{newMsg.From, newMsg.From}
|
|
164
|
166
|
resultAddNode := moscato.MicroServiceManager.AddMicroservice(newNode)
|
|
|
@@ -179,6 +181,8 @@ func (moscato *Moscato) Receive(msg MsgUnit) (MsgUnit, error) {
|
|
179
|
181
|
go moscato.Send2MS(addr, newMsg)
|
|
180
|
182
|
|
|
181
|
183
|
case WM: //Withdraw msg
|
|
|
184
|
+ fromNodeName, _ := moscato.MicroServiceManager.GetIpaddr(msg.(RegisterMsg).From)
|
|
|
185
|
+ logger.Info("WM received from:[" + fromNodeName + "]")
|
|
182
|
186
|
//ip := msg.(WithdrawMsg).From
|
|
183
|
187
|
//sublist := moscato.SubscriptionManager.ip2sub[ip]
|
|
184
|
188
|
//fmt.Println("prev list = ", sublist)
|
|
|
@@ -286,6 +290,21 @@ func (moscato *Moscato) Run() {
|
|
286
|
290
|
config := AppConfig{moscato}
|
|
287
|
291
|
config.config()
|
|
288
|
292
|
|
|
|
293
|
+ sigs := make(chan os.Signal, 1)
|
|
|
294
|
+ done := make(chan bool, 1)
|
|
|
295
|
+
|
|
|
296
|
+ signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
|
|
297
|
+
|
|
|
298
|
+ go func() {
|
|
|
299
|
+ sig := <-sigs
|
|
|
300
|
+ //withDraw(message, client)
|
|
|
301
|
+ //fmt.Println(sig)
|
|
|
302
|
+ _ = sig
|
|
|
303
|
+ done <- true
|
|
|
304
|
+ logger.Info("terminate Moscato Message Middleware")
|
|
|
305
|
+ os.Exit(0)
|
|
|
306
|
+ }()
|
|
|
307
|
+
|
|
289
|
308
|
//모스카토 구조체 변수 초기화
|
|
290
|
309
|
receiver := Receiver{moscato: moscato}
|
|
291
|
310
|
err := moscato.queue.queue_init()
|
|
|
@@ -318,10 +337,12 @@ func (moscato *Moscato) Run() {
|
|
318
|
337
|
go Listen()
|
|
319
|
338
|
logger.Info("initializing complete")
|
|
320
|
339
|
|
|
321
|
|
- fmt.Scanln()
|
|
|
340
|
+ <-done
|
|
322
|
341
|
}
|
|
323
|
342
|
|
|
324
|
343
|
func Listen() {
|
|
|
344
|
+ logger := NewMyLogger()
|
|
|
345
|
+ defer logger.Sync()
|
|
325
|
346
|
/*
|
|
326
|
347
|
MS→MM일때 ⇒ port : 8160으로 열기
|
|
327
|
348
|
|
|
|
@@ -331,8 +352,7 @@ func Listen() {
|
|
331
|
352
|
l, err1 := net.Listen("tcp", ":8160")
|
|
332
|
353
|
|
|
333
|
354
|
if err1 != nil {
|
|
334
|
|
- //log.Fatal(fmt.Sprintf("Unable to listen on given port: %s", err1))
|
|
335
|
|
- l4g.LOGGER("Test").Critical("Unable to listen on given port: %s", err1)
|
|
|
355
|
+ logger.Fatal("Unable to listen on given port: " + err1.Error())
|
|
336
|
356
|
}
|
|
337
|
357
|
defer l.Close()
|
|
338
|
358
|
|