2 Commits

Autor SHA1 Nachricht Datum
  kidjung b8d5bb1a89 [add] SIGINT 핸들링 추가 vor 4 Jahren
  kidjung 6f89ebadcb [add] init.go 기본적인 로그작업 vor 4 Jahren

+ 44
- 20
src/broker/modules/init.go Datei anzeigen

@@ -4,14 +4,14 @@ import (
4 4
 	"encoding/json"
5 5
 	"errors"
6 6
 	"fmt"
7
-	"github.com/fatih/color"
8
-	l4g "github.com/jeanphorn/log4go"
9 7
 	"net"
10 8
 	"net/rpc"
9
+	"os"
10
+	"os/signal"
11
+	"strconv"
12
+	"syscall"
11 13
 )
12 14
 
13
-// TODO: RM 처리와 init.go 전반적으로 DI 이용하여 구현 RM 키등록 부분 구현 예정
14
-
15 15
 type Moscato struct {
16 16
 	queue               MsgQueue
17 17
 	SendQueue           chan myType
@@ -68,7 +68,7 @@ func (receiver Receiver) MmReceive(args Args, reply *Reply) error { //
68 68
 
69 69
 			}
70 70
 		}()
71
-		reply.CompleteLog = "received"
71
+		reply.CompleteLog = "KSM received"
72 72
 	case PM:
73 73
 		var msg PublishMsg
74 74
 		err := json.Unmarshal(args.JsonMsg, &msg)
@@ -131,7 +131,7 @@ func (receiver Receiver) MmReceive(args Args, reply *Reply) error { //
131 131
 
132 132
 //Recieve - MM가 MS로부터 메세지 전달받음
133 133
 func (moscato *Moscato) Receive(msg MsgUnit) (MsgUnit, error) {
134
-	var logger = newLogger()
134
+	logger := NewMyLogger()
135 135
 	defer logger.Sync()
136 136
 
137 137
 	//rpc call
@@ -143,43 +143,46 @@ func (moscato *Moscato) Receive(msg MsgUnit) (MsgUnit, error) {
143 143
 
144 144
 	case PM: //Publish msg
145 145
 		//log.Println("PM received")
146
-		logger.Info("PM received")
146
+		fromNodeName, _ := moscato.MicroServiceManager.GetIpaddr(msg.(PublishMsg).From)
147
+		logger.Info("PM received from:[" + fromNodeName + "]")
147 148
 		moscato.queue.push(moscato.preProcessMsg(msg))
148 149
 
149 150
 	case SM: //Subscription msg
150 151
 		//log.Println("SM received")
151
-		logger.Info("SM received")
152
+		fromNodeName, _ := moscato.MicroServiceManager.GetIpaddr(msg.(SubscriptionMsg).From)
153
+		logger.Info("SM received from:[" + fromNodeName + "]")
152 154
 		err := moscato.SubscriptionManager.addSubscription(moscato.preProcessMsg(msg))
153 155
 		if err != nil {
154
-			println(err)
156
+			logger.Warn(err.Error())
155 157
 			//return nil, err
156 158
 		}
157 159
 
158 160
 	case RM: //Register msg
159
-		logger.Info("RM received")
160 161
 		var newMsg RegisterMsg
161 162
 		newMsg = msg.(RegisterMsg)
163
+		logger.Info("RM received from:[" + newMsg.From + "]")
162 164
 
163 165
 		newNode := MSNode{newMsg.From, newMsg.From}
164 166
 		resultAddNode := moscato.MicroServiceManager.AddMicroservice(newNode)
165 167
 		if resultAddNode {
166
-			//log.Println("Node added successful")
167
-			l4g.LOGGER("Test").Info("Node added successful")
168
+			logger.Info("Node added successful")
168 169
 		} else {
169
-			l4g.LOGGER("Test").Error("Node is already added, ignore RM")
170
+			logger.Error("Node is already added, ignore RM")
170 171
 			//log.Println("Node is already added, ignore RM")
171 172
 			return msg, nil
172 173
 		}
173 174
 
174 175
 		addr, _ := moscato.MicroServiceManager.GetIpaddr(newMsg.From)
175 176
 		moscato.SecureManager.RegKey(newMsg)
176
-		fmt.Println("Registered microservice: address", addr,
177
-			"/ key", moscato.SecureManager.GetNodeKey(newMsg.From))
177
+		logger.Debug("Registered microservice: address " + addr +
178
+			" / key " + strconv.FormatUint(uint64(moscato.SecureManager.GetNodeKey(newMsg.From)), 10))
178 179
 
179 180
 		// ackRM 메세지 전송
180 181
 		go moscato.Send2MS(addr, newMsg)
181 182
 
182 183
 	case WM: //Withdraw msg
184
+		fromNodeName, _ := moscato.MicroServiceManager.GetIpaddr(msg.(RegisterMsg).From)
185
+		logger.Info("WM received from:[" + fromNodeName + "]")
183 186
 		//ip := msg.(WithdrawMsg).From
184 187
 		//sublist := moscato.SubscriptionManager.ip2sub[ip]
185 188
 		//fmt.Println("prev list = ", sublist)
@@ -211,6 +214,9 @@ MM→MS
211 214
 */
212 215
 
213 216
 func (moscato *Moscato) Send2MS(ipaddress string, msg MsgUnit) {
217
+	logger := NewMyLogger()
218
+	defer logger.Sync()
219
+
214 220
 	client, err := rpc.Dial("tcp", ipaddress+":8150")
215 221
 	if err != nil {
216 222
 		fmt.Println(err)
@@ -232,7 +238,7 @@ func (moscato *Moscato) Send2MS(ipaddress string, msg MsgUnit) {
232 238
 	//log.Println(reply.CompleteLog) //잘 받았는지 확인 해줌
233 239
 	// 마이크로 서비스에게 받은 메시지는 노란색으로 출력
234 240
 	//log.Println(reply.CompleteLog)
235
-	//l4g.LOGGER("Test").Info(reply.CompleteLog)
241
+	logger.Debug(reply.CompleteLog)
236 242
 }
237 243
 
238 244
 //Matching을 용이하게 하기위한 메세지 가공 과정
@@ -278,10 +284,27 @@ func (moscato *Moscato) SendWithEncrypt() MsgUnit {
278 284
 }
279 285
 
280 286
 func (moscato *Moscato) Run() {
287
+	logger := NewMyLogger()
288
+	defer logger.Sync()
281 289
 
282 290
 	config := AppConfig{moscato}
283 291
 	config.config()
284 292
 
293
+	sigs := make(chan os.Signal, 1)
294
+	done := make(chan bool, 1)
295
+
296
+	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
297
+
298
+	go func() {
299
+		sig := <-sigs
300
+		//withDraw(message, client)
301
+		//fmt.Println(sig)
302
+		_ = sig
303
+		done <- true
304
+		logger.Info("terminate Moscato Message Middleware")
305
+		os.Exit(0)
306
+	}()
307
+
285 308
 	//모스카토 구조체 변수 초기화
286 309
 	receiver := Receiver{moscato: moscato}
287 310
 	err := moscato.queue.queue_init()
@@ -312,12 +335,14 @@ func (moscato *Moscato) Run() {
312 335
 	}
313 336
 
314 337
 	go Listen()
315
-	color.Blue("initializing complete.")
338
+	logger.Info("initializing complete")
316 339
 
317
-	fmt.Scanln()
340
+	<-done
318 341
 }
319 342
 
320 343
 func Listen() {
344
+	logger := NewMyLogger()
345
+	defer logger.Sync()
321 346
 	/*
322 347
 		MS→MM일때 ⇒ port : 8160으로 열기
323 348
 
@@ -327,8 +352,7 @@ func Listen() {
327 352
 	l, err1 := net.Listen("tcp", ":8160")
328 353
 
329 354
 	if err1 != nil {
330
-		//log.Fatal(fmt.Sprintf("Unable to listen on given port: %s", err1))
331
-		l4g.LOGGER("Test").Critical("Unable to listen on given port: %s", err1)
355
+		logger.Fatal("Unable to listen on given port: " + err1.Error())
332 356
 	}
333 357
 	defer l.Close()
334 358
 

+ 6
- 6
src/broker/modules/logConfig.go Datei anzeigen

@@ -12,24 +12,24 @@ func MyEncoderConfig() zapcore.EncoderConfig {
12 12
 		LevelKey: "L",
13 13
 		NameKey:  "N",
14 14
 		//CallerKey:      "C",
15
-		FunctionKey:    zapcore.OmitKey,
16
-		MessageKey:     "M",
17
-		StacktraceKey:  "S",
15
+		FunctionKey: zapcore.OmitKey,
16
+		MessageKey:  "M",
17
+		//StacktraceKey:  "S",
18 18
 		LineEnding:     zapcore.DefaultLineEnding,
19
-		EncodeLevel:    zapcore.CapitalLevelEncoder,
19
+		EncodeLevel:    zapcore.CapitalColorLevelEncoder,
20 20
 		EncodeTime:     zapcore.ISO8601TimeEncoder,
21 21
 		EncodeDuration: zapcore.StringDurationEncoder,
22 22
 		EncodeCaller:   zapcore.ShortCallerEncoder,
23 23
 	}
24 24
 }
25 25
 
26
-func newLogger() *zap.Logger {
26
+func NewMyLogger() *zap.Logger {
27 27
 	cfg := zap.Config{
28 28
 		Level:            zap.NewAtomicLevelAt(zap.DebugLevel),
29 29
 		Development:      true,
30 30
 		Encoding:         "console",
31 31
 		EncoderConfig:    MyEncoderConfig(),
32
-		OutputPaths:      []string{"stdout"},
32
+		OutputPaths:      []string{"stdout", "./test.log"},
33 33
 		ErrorOutputPaths: []string{"stderr"},
34 34
 	}
35 35
 	logger, err := cfg.Build()

+ 9
- 9
src/broker/modules/manage.go Datei anzeigen

@@ -1,10 +1,5 @@
1 1
 package modules
2 2
 
3
-import (
4
-	"fmt"
5
-	"log"
6
-)
7
-
8 3
 //각 Microservice에 대한 정보 저장 노드
9 4
 type MSNode struct {
10 5
 	nodeName string //Nodename- 현재 데모에서는 IpAddress와 같음
@@ -23,8 +18,8 @@ func (node *MSNode) GetIpaddr() string {
23 18
 
24 19
 type NodeManager interface {
25 20
 	GetIpaddr(nodeName string) (string, bool) //IpAddress반환
26
-	AddMicroservice(node MSNode) bool //MS추가
27
-	RemoveMicroservice(nodeName string) bool //MS삭제
21
+	AddMicroservice(node MSNode) bool         //MS추가
22
+	RemoveMicroservice(nodeName string) bool  //MS삭제
28 23
 }
29 24
 
30 25
 //모든 Microservice정보 저장
@@ -34,7 +29,10 @@ type MStable struct {
34 29
 
35 30
 //MStable 생성자
36 31
 func NewMStable() *MStable {
37
-	defer fmt.Println("node manager setting complete.")
32
+	logger := NewMyLogger()
33
+	defer logger.Sync()
34
+
35
+	defer logger.Debug("node manager setting complete.")
38 36
 	return &MStable{NodeTable: make(map[string]MSNode)}
39 37
 }
40 38
 
@@ -66,12 +64,14 @@ func (manager *MStable) AddMicroservice(node MSNode) bool {
66 64
 
67 65
 //MS삭제
68 66
 func (manager *MStable) RemoveMicroservice(nodeName string) bool {
67
+	logger := NewMyLogger()
68
+	defer logger.Sync()
69 69
 	//삭제 전 존재여부 확인
70 70
 	_, exists := manager.NodeTable[nodeName] //해당 이름을 가진 Node가 있는지 검색
71 71
 
72 72
 	if exists {
73 73
 		delete(manager.NodeTable, nodeName) //존재한다면 삭제
74
-		log.Println("[" + nodeName + "] : service quit")
74
+		logger.Info("[" + nodeName + "] : service quit")
75 75
 		return true
76 76
 	} else {
77 77
 		return false

+ 7
- 5
src/broker/modules/queue.go Datei anzeigen

@@ -2,7 +2,6 @@ package modules
2 2
 
3 3
 import (
4 4
 	"errors"
5
-	"fmt"
6 5
 )
7 6
 
8 7
 type MsgQueue struct {
@@ -11,19 +10,22 @@ type MsgQueue struct {
11 10
 }
12 11
 
13 12
 type QueueOperate interface {
14
-	queue_init() error	//Queue 초기화 멤버함수
15
-	push(msg Message) bool //Queue push 멤버함수
16
-	pop(wait bool) (Message, error)	//Queue pop 멤버함수 (wait -> busy waiting 여부 결정)
13
+	queue_init() error              //Queue 초기화 멤버함수
14
+	push(msg Message) bool          //Queue push 멤버함수
15
+	pop(wait bool) (Message, error) //Queue pop 멤버함수 (wait -> busy waiting 여부 결정)
17 16
 }
18 17
 
19 18
 //Message Queue를 초기화 해주는 함수
20 19
 func (mq *MsgQueue) queue_init() error {
20
+	logger := NewMyLogger()
21
+	logger.Sync()
22
+
21 23
 	if mq.queue != nil && len(mq.queue) != 0 {
22 24
 		return errors.New("Queue Hadlerer Error: Already initialized.")
23 25
 	} else if mq.queue == nil {
24 26
 		mq.queue = make(chan MsgUnit, 1000)
25 27
 		//log.Println("queue is initialized.")
26
-		fmt.Println("queue is initialized.")
28
+		logger.Debug("queue is initialized")
27 29
 		return nil
28 30
 	} else {
29 31
 		close(mq.queue)

+ 6
- 3
src/broker/modules/secure.go Datei anzeigen

@@ -2,7 +2,6 @@ package modules
2 2
 
3 3
 import (
4 4
 	"fmt"
5
-	"log"
6 5
 	"strconv"
7 6
 )
8 7
 
@@ -13,8 +12,10 @@ type Security struct {
13 12
 
14 13
 //Security 생성자
15 14
 func NewSecurity() *Security {
15
+	logger := NewMyLogger()
16
+	defer logger.Sync()
16 17
 	security := &Security{map[string]string{}}
17
-	fmt.Println("security setting complete.")
18
+	defer logger.Debug("security setting complete.")
18 19
 	return security
19 20
 }
20 21
 
@@ -86,12 +87,14 @@ func (sc Security) ReEncPubMsg(fromPubMsg PublishMsg, nodeName string) PublishMs
86 87
 
87 88
 //Key제거 함수
88 89
 func (sc *Security) RemoveSecureKey(nodeName string) bool {
90
+	logger := NewMyLogger()
91
+	defer logger.Sync()
89 92
 	//삭제 전 존재여부 확인
90 93
 	_, exists := sc.KeyMap[nodeName]
91 94
 
92 95
 	if exists {
93 96
 		delete(sc.KeyMap, nodeName)
94
-		log.Println("[" + nodeName + "] : delete Key successful")
97
+		logger.Debug("[" + nodeName + "] : delete Key successful")
95 98
 		return true
96 99
 	} else {
97 100
 		return false

Laden…
Abbrechen
Speichern