2 Revīzijas

Autors SHA1 Ziņojums Datums
  kidjung b8d5bb1a89 [add] SIGINT 핸들링 추가 4 gadus atpakaļ
  kidjung 6f89ebadcb [add] init.go 기본적인 로그작업 4 gadus atpakaļ

+ 44
- 20
src/broker/modules/init.go Parādīt failu

4
 	"encoding/json"
4
 	"encoding/json"
5
 	"errors"
5
 	"errors"
6
 	"fmt"
6
 	"fmt"
7
-	"github.com/fatih/color"
8
-	l4g "github.com/jeanphorn/log4go"
9
 	"net"
7
 	"net"
10
 	"net/rpc"
8
 	"net/rpc"
9
+	"os"
10
+	"os/signal"
11
+	"strconv"
12
+	"syscall"
11
 )
13
 )
12
 
14
 
13
-// TODO: RM 처리와 init.go 전반적으로 DI 이용하여 구현 RM 키등록 부분 구현 예정
14
-
15
 type Moscato struct {
15
 type Moscato struct {
16
 	queue               MsgQueue
16
 	queue               MsgQueue
17
 	SendQueue           chan myType
17
 	SendQueue           chan myType
68
 
68
 
69
 			}
69
 			}
70
 		}()
70
 		}()
71
-		reply.CompleteLog = "received"
71
+		reply.CompleteLog = "KSM received"
72
 	case PM:
72
 	case PM:
73
 		var msg PublishMsg
73
 		var msg PublishMsg
74
 		err := json.Unmarshal(args.JsonMsg, &msg)
74
 		err := json.Unmarshal(args.JsonMsg, &msg)
131
 
131
 
132
 //Recieve - MM가 MS로부터 메세지 전달받음
132
 //Recieve - MM가 MS로부터 메세지 전달받음
133
 func (moscato *Moscato) Receive(msg MsgUnit) (MsgUnit, error) {
133
 func (moscato *Moscato) Receive(msg MsgUnit) (MsgUnit, error) {
134
-	var logger = newLogger()
134
+	logger := NewMyLogger()
135
 	defer logger.Sync()
135
 	defer logger.Sync()
136
 
136
 
137
 	//rpc call
137
 	//rpc call
143
 
143
 
144
 	case PM: //Publish msg
144
 	case PM: //Publish msg
145
 		//log.Println("PM received")
145
 		//log.Println("PM received")
146
-		logger.Info("PM received")
146
+		fromNodeName, _ := moscato.MicroServiceManager.GetIpaddr(msg.(PublishMsg).From)
147
+		logger.Info("PM received from:[" + fromNodeName + "]")
147
 		moscato.queue.push(moscato.preProcessMsg(msg))
148
 		moscato.queue.push(moscato.preProcessMsg(msg))
148
 
149
 
149
 	case SM: //Subscription msg
150
 	case SM: //Subscription msg
150
 		//log.Println("SM received")
151
 		//log.Println("SM received")
151
-		logger.Info("SM received")
152
+		fromNodeName, _ := moscato.MicroServiceManager.GetIpaddr(msg.(SubscriptionMsg).From)
153
+		logger.Info("SM received from:[" + fromNodeName + "]")
152
 		err := moscato.SubscriptionManager.addSubscription(moscato.preProcessMsg(msg))
154
 		err := moscato.SubscriptionManager.addSubscription(moscato.preProcessMsg(msg))
153
 		if err != nil {
155
 		if err != nil {
154
-			println(err)
156
+			logger.Warn(err.Error())
155
 			//return nil, err
157
 			//return nil, err
156
 		}
158
 		}
157
 
159
 
158
 	case RM: //Register msg
160
 	case RM: //Register msg
159
-		logger.Info("RM received")
160
 		var newMsg RegisterMsg
161
 		var newMsg RegisterMsg
161
 		newMsg = msg.(RegisterMsg)
162
 		newMsg = msg.(RegisterMsg)
163
+		logger.Info("RM received from:[" + newMsg.From + "]")
162
 
164
 
163
 		newNode := MSNode{newMsg.From, newMsg.From}
165
 		newNode := MSNode{newMsg.From, newMsg.From}
164
 		resultAddNode := moscato.MicroServiceManager.AddMicroservice(newNode)
166
 		resultAddNode := moscato.MicroServiceManager.AddMicroservice(newNode)
165
 		if resultAddNode {
167
 		if resultAddNode {
166
-			//log.Println("Node added successful")
167
-			l4g.LOGGER("Test").Info("Node added successful")
168
+			logger.Info("Node added successful")
168
 		} else {
169
 		} else {
169
-			l4g.LOGGER("Test").Error("Node is already added, ignore RM")
170
+			logger.Error("Node is already added, ignore RM")
170
 			//log.Println("Node is already added, ignore RM")
171
 			//log.Println("Node is already added, ignore RM")
171
 			return msg, nil
172
 			return msg, nil
172
 		}
173
 		}
173
 
174
 
174
 		addr, _ := moscato.MicroServiceManager.GetIpaddr(newMsg.From)
175
 		addr, _ := moscato.MicroServiceManager.GetIpaddr(newMsg.From)
175
 		moscato.SecureManager.RegKey(newMsg)
176
 		moscato.SecureManager.RegKey(newMsg)
176
-		fmt.Println("Registered microservice: address", addr,
177
-			"/ key", moscato.SecureManager.GetNodeKey(newMsg.From))
177
+		logger.Debug("Registered microservice: address " + addr +
178
+			" / key " + strconv.FormatUint(uint64(moscato.SecureManager.GetNodeKey(newMsg.From)), 10))
178
 
179
 
179
 		// ackRM 메세지 전송
180
 		// ackRM 메세지 전송
180
 		go moscato.Send2MS(addr, newMsg)
181
 		go moscato.Send2MS(addr, newMsg)
181
 
182
 
182
 	case WM: //Withdraw msg
183
 	case WM: //Withdraw msg
184
+		fromNodeName, _ := moscato.MicroServiceManager.GetIpaddr(msg.(RegisterMsg).From)
185
+		logger.Info("WM received from:[" + fromNodeName + "]")
183
 		//ip := msg.(WithdrawMsg).From
186
 		//ip := msg.(WithdrawMsg).From
184
 		//sublist := moscato.SubscriptionManager.ip2sub[ip]
187
 		//sublist := moscato.SubscriptionManager.ip2sub[ip]
185
 		//fmt.Println("prev list = ", sublist)
188
 		//fmt.Println("prev list = ", sublist)
211
 */
214
 */
212
 
215
 
213
 func (moscato *Moscato) Send2MS(ipaddress string, msg MsgUnit) {
216
 func (moscato *Moscato) Send2MS(ipaddress string, msg MsgUnit) {
217
+	logger := NewMyLogger()
218
+	defer logger.Sync()
219
+
214
 	client, err := rpc.Dial("tcp", ipaddress+":8150")
220
 	client, err := rpc.Dial("tcp", ipaddress+":8150")
215
 	if err != nil {
221
 	if err != nil {
216
 		fmt.Println(err)
222
 		fmt.Println(err)
232
 	//log.Println(reply.CompleteLog) //잘 받았는지 확인 해줌
238
 	//log.Println(reply.CompleteLog) //잘 받았는지 확인 해줌
233
 	// 마이크로 서비스에게 받은 메시지는 노란색으로 출력
239
 	// 마이크로 서비스에게 받은 메시지는 노란색으로 출력
234
 	//log.Println(reply.CompleteLog)
240
 	//log.Println(reply.CompleteLog)
235
-	//l4g.LOGGER("Test").Info(reply.CompleteLog)
241
+	logger.Debug(reply.CompleteLog)
236
 }
242
 }
237
 
243
 
238
 //Matching을 용이하게 하기위한 메세지 가공 과정
244
 //Matching을 용이하게 하기위한 메세지 가공 과정
278
 }
284
 }
279
 
285
 
280
 func (moscato *Moscato) Run() {
286
 func (moscato *Moscato) Run() {
287
+	logger := NewMyLogger()
288
+	defer logger.Sync()
281
 
289
 
282
 	config := AppConfig{moscato}
290
 	config := AppConfig{moscato}
283
 	config.config()
291
 	config.config()
284
 
292
 
293
+	sigs := make(chan os.Signal, 1)
294
+	done := make(chan bool, 1)
295
+
296
+	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
297
+
298
+	go func() {
299
+		sig := <-sigs
300
+		//withDraw(message, client)
301
+		//fmt.Println(sig)
302
+		_ = sig
303
+		done <- true
304
+		logger.Info("terminate Moscato Message Middleware")
305
+		os.Exit(0)
306
+	}()
307
+
285
 	//모스카토 구조체 변수 초기화
308
 	//모스카토 구조체 변수 초기화
286
 	receiver := Receiver{moscato: moscato}
309
 	receiver := Receiver{moscato: moscato}
287
 	err := moscato.queue.queue_init()
310
 	err := moscato.queue.queue_init()
312
 	}
335
 	}
313
 
336
 
314
 	go Listen()
337
 	go Listen()
315
-	color.Blue("initializing complete.")
338
+	logger.Info("initializing complete")
316
 
339
 
317
-	fmt.Scanln()
340
+	<-done
318
 }
341
 }
319
 
342
 
320
 func Listen() {
343
 func Listen() {
344
+	logger := NewMyLogger()
345
+	defer logger.Sync()
321
 	/*
346
 	/*
322
 		MS→MM일때 ⇒ port : 8160으로 열기
347
 		MS→MM일때 ⇒ port : 8160으로 열기
323
 
348
 
327
 	l, err1 := net.Listen("tcp", ":8160")
352
 	l, err1 := net.Listen("tcp", ":8160")
328
 
353
 
329
 	if err1 != nil {
354
 	if err1 != nil {
330
-		//log.Fatal(fmt.Sprintf("Unable to listen on given port: %s", err1))
331
-		l4g.LOGGER("Test").Critical("Unable to listen on given port: %s", err1)
355
+		logger.Fatal("Unable to listen on given port: " + err1.Error())
332
 	}
356
 	}
333
 	defer l.Close()
357
 	defer l.Close()
334
 
358
 

+ 6
- 6
src/broker/modules/logConfig.go Parādīt failu

12
 		LevelKey: "L",
12
 		LevelKey: "L",
13
 		NameKey:  "N",
13
 		NameKey:  "N",
14
 		//CallerKey:      "C",
14
 		//CallerKey:      "C",
15
-		FunctionKey:    zapcore.OmitKey,
16
-		MessageKey:     "M",
17
-		StacktraceKey:  "S",
15
+		FunctionKey: zapcore.OmitKey,
16
+		MessageKey:  "M",
17
+		//StacktraceKey:  "S",
18
 		LineEnding:     zapcore.DefaultLineEnding,
18
 		LineEnding:     zapcore.DefaultLineEnding,
19
-		EncodeLevel:    zapcore.CapitalLevelEncoder,
19
+		EncodeLevel:    zapcore.CapitalColorLevelEncoder,
20
 		EncodeTime:     zapcore.ISO8601TimeEncoder,
20
 		EncodeTime:     zapcore.ISO8601TimeEncoder,
21
 		EncodeDuration: zapcore.StringDurationEncoder,
21
 		EncodeDuration: zapcore.StringDurationEncoder,
22
 		EncodeCaller:   zapcore.ShortCallerEncoder,
22
 		EncodeCaller:   zapcore.ShortCallerEncoder,
23
 	}
23
 	}
24
 }
24
 }
25
 
25
 
26
-func newLogger() *zap.Logger {
26
+func NewMyLogger() *zap.Logger {
27
 	cfg := zap.Config{
27
 	cfg := zap.Config{
28
 		Level:            zap.NewAtomicLevelAt(zap.DebugLevel),
28
 		Level:            zap.NewAtomicLevelAt(zap.DebugLevel),
29
 		Development:      true,
29
 		Development:      true,
30
 		Encoding:         "console",
30
 		Encoding:         "console",
31
 		EncoderConfig:    MyEncoderConfig(),
31
 		EncoderConfig:    MyEncoderConfig(),
32
-		OutputPaths:      []string{"stdout"},
32
+		OutputPaths:      []string{"stdout", "./test.log"},
33
 		ErrorOutputPaths: []string{"stderr"},
33
 		ErrorOutputPaths: []string{"stderr"},
34
 	}
34
 	}
35
 	logger, err := cfg.Build()
35
 	logger, err := cfg.Build()

+ 9
- 9
src/broker/modules/manage.go Parādīt failu

1
 package modules
1
 package modules
2
 
2
 
3
-import (
4
-	"fmt"
5
-	"log"
6
-)
7
-
8
 //각 Microservice에 대한 정보 저장 노드
3
 //각 Microservice에 대한 정보 저장 노드
9
 type MSNode struct {
4
 type MSNode struct {
10
 	nodeName string //Nodename- 현재 데모에서는 IpAddress와 같음
5
 	nodeName string //Nodename- 현재 데모에서는 IpAddress와 같음
23
 
18
 
24
 type NodeManager interface {
19
 type NodeManager interface {
25
 	GetIpaddr(nodeName string) (string, bool) //IpAddress반환
20
 	GetIpaddr(nodeName string) (string, bool) //IpAddress반환
26
-	AddMicroservice(node MSNode) bool //MS추가
27
-	RemoveMicroservice(nodeName string) bool //MS삭제
21
+	AddMicroservice(node MSNode) bool         //MS추가
22
+	RemoveMicroservice(nodeName string) bool  //MS삭제
28
 }
23
 }
29
 
24
 
30
 //모든 Microservice정보 저장
25
 //모든 Microservice정보 저장
34
 
29
 
35
 //MStable 생성자
30
 //MStable 생성자
36
 func NewMStable() *MStable {
31
 func NewMStable() *MStable {
37
-	defer fmt.Println("node manager setting complete.")
32
+	logger := NewMyLogger()
33
+	defer logger.Sync()
34
+
35
+	defer logger.Debug("node manager setting complete.")
38
 	return &MStable{NodeTable: make(map[string]MSNode)}
36
 	return &MStable{NodeTable: make(map[string]MSNode)}
39
 }
37
 }
40
 
38
 
66
 
64
 
67
 //MS삭제
65
 //MS삭제
68
 func (manager *MStable) RemoveMicroservice(nodeName string) bool {
66
 func (manager *MStable) RemoveMicroservice(nodeName string) bool {
67
+	logger := NewMyLogger()
68
+	defer logger.Sync()
69
 	//삭제 전 존재여부 확인
69
 	//삭제 전 존재여부 확인
70
 	_, exists := manager.NodeTable[nodeName] //해당 이름을 가진 Node가 있는지 검색
70
 	_, exists := manager.NodeTable[nodeName] //해당 이름을 가진 Node가 있는지 검색
71
 
71
 
72
 	if exists {
72
 	if exists {
73
 		delete(manager.NodeTable, nodeName) //존재한다면 삭제
73
 		delete(manager.NodeTable, nodeName) //존재한다면 삭제
74
-		log.Println("[" + nodeName + "] : service quit")
74
+		logger.Info("[" + nodeName + "] : service quit")
75
 		return true
75
 		return true
76
 	} else {
76
 	} else {
77
 		return false
77
 		return false

+ 7
- 5
src/broker/modules/queue.go Parādīt failu

2
 
2
 
3
 import (
3
 import (
4
 	"errors"
4
 	"errors"
5
-	"fmt"
6
 )
5
 )
7
 
6
 
8
 type MsgQueue struct {
7
 type MsgQueue struct {
11
 }
10
 }
12
 
11
 
13
 type QueueOperate interface {
12
 type QueueOperate interface {
14
-	queue_init() error	//Queue 초기화 멤버함수
15
-	push(msg Message) bool //Queue push 멤버함수
16
-	pop(wait bool) (Message, error)	//Queue pop 멤버함수 (wait -> busy waiting 여부 결정)
13
+	queue_init() error              //Queue 초기화 멤버함수
14
+	push(msg Message) bool          //Queue push 멤버함수
15
+	pop(wait bool) (Message, error) //Queue pop 멤버함수 (wait -> busy waiting 여부 결정)
17
 }
16
 }
18
 
17
 
19
 //Message Queue를 초기화 해주는 함수
18
 //Message Queue를 초기화 해주는 함수
20
 func (mq *MsgQueue) queue_init() error {
19
 func (mq *MsgQueue) queue_init() error {
20
+	logger := NewMyLogger()
21
+	logger.Sync()
22
+
21
 	if mq.queue != nil && len(mq.queue) != 0 {
23
 	if mq.queue != nil && len(mq.queue) != 0 {
22
 		return errors.New("Queue Hadlerer Error: Already initialized.")
24
 		return errors.New("Queue Hadlerer Error: Already initialized.")
23
 	} else if mq.queue == nil {
25
 	} else if mq.queue == nil {
24
 		mq.queue = make(chan MsgUnit, 1000)
26
 		mq.queue = make(chan MsgUnit, 1000)
25
 		//log.Println("queue is initialized.")
27
 		//log.Println("queue is initialized.")
26
-		fmt.Println("queue is initialized.")
28
+		logger.Debug("queue is initialized")
27
 		return nil
29
 		return nil
28
 	} else {
30
 	} else {
29
 		close(mq.queue)
31
 		close(mq.queue)

+ 6
- 3
src/broker/modules/secure.go Parādīt failu

2
 
2
 
3
 import (
3
 import (
4
 	"fmt"
4
 	"fmt"
5
-	"log"
6
 	"strconv"
5
 	"strconv"
7
 )
6
 )
8
 
7
 
13
 
12
 
14
 //Security 생성자
13
 //Security 생성자
15
 func NewSecurity() *Security {
14
 func NewSecurity() *Security {
15
+	logger := NewMyLogger()
16
+	defer logger.Sync()
16
 	security := &Security{map[string]string{}}
17
 	security := &Security{map[string]string{}}
17
-	fmt.Println("security setting complete.")
18
+	defer logger.Debug("security setting complete.")
18
 	return security
19
 	return security
19
 }
20
 }
20
 
21
 
86
 
87
 
87
 //Key제거 함수
88
 //Key제거 함수
88
 func (sc *Security) RemoveSecureKey(nodeName string) bool {
89
 func (sc *Security) RemoveSecureKey(nodeName string) bool {
90
+	logger := NewMyLogger()
91
+	defer logger.Sync()
89
 	//삭제 전 존재여부 확인
92
 	//삭제 전 존재여부 확인
90
 	_, exists := sc.KeyMap[nodeName]
93
 	_, exists := sc.KeyMap[nodeName]
91
 
94
 
92
 	if exists {
95
 	if exists {
93
 		delete(sc.KeyMap, nodeName)
96
 		delete(sc.KeyMap, nodeName)
94
-		log.Println("[" + nodeName + "] : delete Key successful")
97
+		logger.Debug("[" + nodeName + "] : delete Key successful")
95
 		return true
98
 		return true
96
 	} else {
99
 	} else {
97
 		return false
100
 		return false

Notiek ielāde…
Atcelt
Saglabāt