2 Commits

Author SHA1 Message Date
  kidjung d02a70a59e Merge remote-tracking branch 'origin/master' 4 years ago
  kidjung 0dfe8bd517 [add] log4go 테스트 4 years ago
2 changed files with 66 additions and 38 deletions
  1. 17
    0
      .idea/vcs.xml
  2. 49
    38
      src/broker/modules/init.go

+ 17
- 0
.idea/vcs.xml View File

@@ -2,14 +2,31 @@
2 2
 <project version="4">
3 3
   <component name="VcsDirectoryMappings">
4 4
     <mapping directory="$PROJECT_DIR$" vcs="Git" />
5
+    <mapping directory="$PROJECT_DIR$/src/github.com/apex/log" vcs="Git" />
6
+    <mapping directory="$PROJECT_DIR$/src/github.com/benbjohnson/clock" vcs="Git" />
5 7
     <mapping directory="$PROJECT_DIR$/src/github.com/bmizerany/assert" vcs="Git" />
6 8
     <mapping directory="$PROJECT_DIR$/src/github.com/davecgh/go-spew" vcs="Git" />
9
+    <mapping directory="$PROJECT_DIR$/src/github.com/go-kit/kit" vcs="Git" />
10
+    <mapping directory="$PROJECT_DIR$/src/github.com/go-logfmt/logfmt" vcs="Git" />
11
+    <mapping directory="$PROJECT_DIR$/src/github.com/go-stack/stack" vcs="Git" />
12
+    <mapping directory="$PROJECT_DIR$/src/github.com/jeanphorn/log4go" vcs="Git" />
7 13
     <mapping directory="$PROJECT_DIR$/src/github.com/juliangruber/go-intersect" vcs="Git" />
8 14
     <mapping directory="$PROJECT_DIR$/src/github.com/kr/pretty" vcs="Git" />
9 15
     <mapping directory="$PROJECT_DIR$/src/github.com/kr/text" vcs="Git" />
16
+    <mapping directory="$PROJECT_DIR$/src/github.com/pkg/errors" vcs="Git" />
10 17
     <mapping directory="$PROJECT_DIR$/src/github.com/pmezard/go-difflib" vcs="Git" />
11 18
     <mapping directory="$PROJECT_DIR$/src/github.com/rogpeppe/go-internal" vcs="Git" />
19
+    <mapping directory="$PROJECT_DIR$/src/github.com/rs/zerolog" vcs="Git" />
20
+    <mapping directory="$PROJECT_DIR$/src/github.com/sirupsen/logrus" vcs="Git" />
12 21
     <mapping directory="$PROJECT_DIR$/src/github.com/stretchr/testify" vcs="Git" />
22
+    <mapping directory="$PROJECT_DIR$/src/github.com/toolkits/file" vcs="Git" />
23
+    <mapping directory="$PROJECT_DIR$/src/go.uber.org/atomic" vcs="Git" />
24
+    <mapping directory="$PROJECT_DIR$/src/go.uber.org/goleak" vcs="Git" />
25
+    <mapping directory="$PROJECT_DIR$/src/go.uber.org/multierr" vcs="Git" />
26
+    <mapping directory="$PROJECT_DIR$/src/go.uber.org/zap" vcs="Git" />
27
+    <mapping directory="$PROJECT_DIR$/src/google.golang.org/grpc" vcs="Git" />
28
+    <mapping directory="$PROJECT_DIR$/src/gopkg.in/inconshreveable/log15.v2" vcs="Git" />
29
+    <mapping directory="$PROJECT_DIR$/src/gopkg.in/yaml.v2" vcs="Git" />
13 30
     <mapping directory="$PROJECT_DIR$/src/gopkg.in/yaml.v3" vcs="Git" />
14 31
   </component>
15 32
 </project>

+ 49
- 38
src/broker/modules/init.go View File

@@ -1,11 +1,11 @@
1 1
 package modules
2 2
 
3 3
 import (
4
-	"broker/modules/github.com/fatih/color"
5 4
 	"encoding/json"
6 5
 	"errors"
7 6
 	"fmt"
8
-	"log"
7
+	"github.com/fatih/color"
8
+	l4g "github.com/jeanphorn/log4go"
9 9
 	"net"
10 10
 	"net/rpc"
11 11
 )
@@ -13,8 +13,8 @@ import (
13 13
 // TODO: RM 처리와 init.go 전반적으로 DI 이용하여 구현 RM 키등록 부분 구현 예정
14 14
 
15 15
 type Moscato struct {
16
-	queue               MsgQueue //메세지 받는 큐
17
-	SendQueue           chan myType //Publish Message가 들어왔을때 매칭이 된 결과물이 담김
16
+	queue               MsgQueue
17
+	SendQueue           chan myType
18 18
 	MicroServiceManager NodeManager `inject:""`
19 19
 	MatchingManager     match_manager
20 20
 	SubscriptionManager sub_manager
@@ -22,8 +22,8 @@ type Moscato struct {
22 22
 }
23 23
 
24 24
 type myType struct {
25
-	subList []string //sub의 ip주소를 가지고 있음
26
-	pubMsg  MsgUnit //Publish Message
25
+	subList []string
26
+	pubMsg  MsgUnit
27 27
 	err     error
28 28
 }
29 29
 
@@ -36,13 +36,11 @@ type Receiver struct { //RPC 서버에 등록하기 위한 변수
36 36
 }
37 37
 
38 38
 type Args struct { // 매개변수
39
-	JsonMsg []byte //JSON 타입으로 바뀐 메세지
40
-	Kind    int //메세지 종류
39
+	JsonMsg []byte
40
+	Kind    int
41 41
 }
42 42
 
43 43
 /*
44
-MM:MessageMiddleWare
45
-MS:MicroService
46 44
 MS→MM
47 45
 
48 46
 -MM 실행되면 MM서버는 열려있음(MS은 자동으로 Client)(포트 8150)
@@ -52,13 +50,13 @@ MS→MM
52 50
 -MmReceive에서 msgType 검사 후 그것에 맞게 msgUnit으로 MM의 Receive로 보냄
53 51
 
54 52
 -MM의 Receive에서 해당 Message를 처리
55
- */
53
+*/
56 54
 
57
-func (receiver Receiver) MmReceive(args Args, reply *Reply) error {
58
-	// 메세지 타입별로 나눠서 언마샬
55
+func (receiver Receiver) MmReceive(args Args, reply *Reply) error { //
56
+	// 메세지 별로 나눠서 언마샬
59 57
 	switch args.Kind {
60 58
 
61
-	case KSM://KeyShareMessage
59
+	case KSM:
62 60
 		var msg KeyShareMsg
63 61
 		err := json.Unmarshal(args.JsonMsg, &msg)
64 62
 		if err != nil {
@@ -71,7 +69,7 @@ func (receiver Receiver) MmReceive(args Args, reply *Reply) error {
71 69
 			}
72 70
 		}()
73 71
 		reply.CompleteLog = "received"
74
-	case PM://PublishMessage
72
+	case PM:
75 73
 		var msg PublishMsg
76 74
 		err := json.Unmarshal(args.JsonMsg, &msg)
77 75
 		if err != nil {
@@ -85,7 +83,7 @@ func (receiver Receiver) MmReceive(args Args, reply *Reply) error {
85 83
 			}
86 84
 		}()
87 85
 		reply.CompleteLog = "PM received"
88
-	case SM://SubscriptionMessage
86
+	case SM:
89 87
 		var msg SubscriptionMsg
90 88
 		err := json.Unmarshal(args.JsonMsg, &msg)
91 89
 		if err != nil {
@@ -98,7 +96,7 @@ func (receiver Receiver) MmReceive(args Args, reply *Reply) error {
98 96
 			}
99 97
 		}()
100 98
 		reply.CompleteLog = "SM received"
101
-	case RM://RegisterMessage
99
+	case RM:
102 100
 		var msg RegisterMsg
103 101
 		err := json.Unmarshal(args.JsonMsg, &msg)
104 102
 		if err != nil {
@@ -111,7 +109,7 @@ func (receiver Receiver) MmReceive(args Args, reply *Reply) error {
111 109
 			}
112 110
 		}()
113 111
 		reply.CompleteLog = "RM received"
114
-	case WM://WithdrawMessage
112
+	case WM:
115 113
 		var msg WithdrawMsg
116 114
 		err := json.Unmarshal(args.JsonMsg, &msg)
117 115
 		if err != nil {
@@ -124,8 +122,7 @@ func (receiver Receiver) MmReceive(args Args, reply *Reply) error {
124 122
 			}
125 123
 		}()
126 124
 		reply.CompleteLog = "WM received"
127
-	default://MM이 받을 타입의 메세지가 아닌 경우
128
-		reply.CompleteLog="This type is inappropriate"
125
+	default:
129 126
 		return errors.New("message type Error: Not registered message type")
130 127
 	}
131 128
 	//reply.CompleteLog = "received completely"
@@ -133,34 +130,43 @@ func (receiver Receiver) MmReceive(args Args, reply *Reply) error {
133 130
 }
134 131
 //Recieve - MM가 MS로부터 메세지 전달받음
135 132
 func (moscato *Moscato) Receive(msg MsgUnit) (MsgUnit, error) {
133
+	l4g.LoadConfiguration("modules/logConfig.json")
134
+	//rpc call
136 135
 	var msg_type = msg.CheckType()
137 136
 	//메세지 타입에 따라 다르게 처리
138 137
 	switch msg_type {
139 138
 
140
-	case KSM: //KeyShareMessage - 추후 구현
139
+	case KSM: //Key share msg
141 140
 
142
-	case PM: //PublishMessage
143
-		log.Println("PM received")
141
+	case PM: //Publish msg
142
+		//log.Println("PM received")
143
+		l4g.LOGGER("Test").Info("PM received")
144 144
 		moscato.queue.push(moscato.preProcessMsg(msg))
145 145
 
146
-	case SM: //SubscriptionMessage
147
-		log.Println("SM received")
146
+	case SM: //Subscription msg
147
+		//log.Println("SM received")
148
+		l4g.LOGGER("Test").Info("SM received")
148 149
 		err := moscato.SubscriptionManager.addSubscription(moscato.preProcessMsg(msg))
149 150
 		if err != nil {
150 151
 			println(err)
152
+			//return nil, err
151 153
 		}
152 154
 
153
-	case RM: //RegisterMessage
154
-		log.Println("RM received")
155
+	case RM: //Register msg
156
+		//log.Println("RM received")
157
+		l4g.LOGGER("Test").Info("RM received")
158
+		l4g.LOGGER("Test").Debug("RM received")
155 159
 		var newMsg RegisterMsg
156 160
 		newMsg = msg.(RegisterMsg)
157 161
 
158 162
 		newNode := MSNode{newMsg.From, newMsg.From}
159 163
 		resultAddNode := moscato.MicroServiceManager.AddMicroservice(newNode)
160 164
 		if resultAddNode {
161
-			log.Println("Node added successful")
165
+			//log.Println("Node added successful")
166
+			l4g.LOGGER("Test").Info("Node added successful")
162 167
 		} else {
163
-			log.Println("Node is already added, ignore RM")
168
+			l4g.LOGGER("Test").Error("Node is already added, ignore RM")
169
+			//log.Println("Node is already added, ignore RM")
164 170
 			return msg, nil
165 171
 		}
166 172
 
@@ -173,13 +179,16 @@ func (moscato *Moscato) Receive(msg MsgUnit) (MsgUnit, error) {
173 179
 		go moscato.Send2MS(addr, newMsg)
174 180
 
175 181
 	case WM: //Withdraw msg
176
-		ip := msg.(WithdrawMsg).From
182
+		//ip := msg.(WithdrawMsg).From
183
+		//sublist := moscato.SubscriptionManager.ip2sub[ip]
184
+		//fmt.Println("prev list = ", sublist)
177 185
 		moscato.MicroServiceManager.RemoveMicroservice(msg.(WithdrawMsg).From)
178 186
 		moscato.SecureManager.RemoveSecureKey(msg.(WithdrawMsg).From)
179
-		moscato.SubscriptionManager.delete(ip) //해당 Ip주소를 가진 Subscription 정보 삭제
187
+		//moscato.SubscriptionManager.delete(ip)
188
+		//sublist2 := moscato.SubscriptionManager.ip2sub[ip]
189
+		//fmt.Println("after list =", sublist2)
180 190
 
181
-	default://MM이 받을 메세지 타입이 아닌경우
182
-		log.Println("Message type Error: Not registered message type")
191
+	default:
183 192
 		return nil, errors.New("Message type Error: Not registered message type")
184 193
 	}
185 194
 
@@ -201,6 +210,7 @@ MM→MS
201 210
 */
202 211
 
203 212
 func (moscato *Moscato) Send2MS(ipaddress string, msg MsgUnit) {
213
+	l4g.LoadConfiguration("logConfig.json")
204 214
 	client, err := rpc.Dial("tcp", ipaddress+":8150")
205 215
 	if err != nil {
206 216
 		fmt.Println(err)
@@ -221,7 +231,8 @@ func (moscato *Moscato) Send2MS(ipaddress string, msg MsgUnit) {
221 231
 	}
222 232
 	//log.Println(reply.CompleteLog) //잘 받았는지 확인 해줌
223 233
 	// 마이크로 서비스에게 받은 메시지는 노란색으로 출력
224
-	log.Println(reply.CompleteLog)
234
+	//log.Println(reply.CompleteLog)
235
+	l4g.LOGGER("Test").Info(reply.CompleteLog)
225 236
 }
226 237
 
227 238
 //Matching을 용이하게 하기위한 메세지 가공 과정
@@ -286,7 +297,6 @@ func (moscato *Moscato) Run() {
286 297
 	go func() {
287 298
 		for {
288 299
 			msg := moscato.queue.pop(true)
289
-			fmt.Println(msg)
290 300
 			go moscato.Matching(msg)
291 301
 			go moscato.SendWithEncrypt()
292 302
 		}
@@ -297,7 +307,7 @@ func (moscato *Moscato) Run() {
297 307
 	//rpc 등록 -> Receive 함수
298 308
 	err = rpc.Register(receiver)
299 309
 	if err != nil {
300
-		log.Println(err)
310
+		println(err)
301 311
 		return
302 312
 	}
303 313
 
@@ -317,7 +327,8 @@ func Listen() {
317 327
 	l, err1 := net.Listen("tcp", ":8160")
318 328
 
319 329
 	if err1 != nil {
320
-		log.Fatal(fmt.Sprintf("Unable to listen on given port: %s", err1))
330
+		//log.Fatal(fmt.Sprintf("Unable to listen on given port: %s", err1))
331
+		l4g.LOGGER("Test").Critical("Unable to listen on given port: %s", err1)
321 332
 	}
322 333
 	defer l.Close()
323 334
 
@@ -325,4 +336,4 @@ func Listen() {
325 336
 		conn, _ := l.Accept()
326 337
 		go rpc.ServeConn(conn)
327 338
 	}
328
-}
339
+}

Loading…
Cancel
Save