Pārlūkot izejas kodu

[revise] 마이크로 서비스 등록을 위한 Manage.go 수정(인터페이스 생성과 생성자 구현)과 init.go 수정

master
kidjung 4 gadus atpakaļ
vecāks
revīzija
3f06d313bf

+ 46
- 31
src/broker/modules/init.go Parādīt failu

@@ -9,6 +9,7 @@ import (
9 9
 	"net/rpc"
10 10
 )
11 11
 
12
+// TODO: RM 처리와 init.go 전반적으로 DI 이용하여 구현
12 13
 //temporary type for matching manager
13 14
 //type match_manager struct{}
14 15
 
@@ -25,10 +26,10 @@ func (match_mng *match_manager) add_subscription(msg MsgUnit) {
25 26
 type secure_manager struct{}
26 27
 
27 28
 type Moscato struct {
28
-	queue      MsgQueue
29
-	ms_mng     MStable
30
-	match_mng  match_manager
31
-	secure_mng secure_manager
29
+	queue               MsgQueue
30
+	MicroServiceManager NodeManager
31
+	match_mng           match_manager
32
+	secure_mng          secure_manager
32 33
 }
33 34
 
34 35
 type Reply struct {
@@ -50,7 +51,7 @@ func (receiver Receiver) MmReceive(args Args, reply *Reply) error {
50 51
 
51 52
 	case KSM:
52 53
 		var msg KeyShareMsg
53
-		err:=json.Unmarshal(args.JsonMsg,&msg)
54
+		err := json.Unmarshal(args.JsonMsg, &msg)
54 55
 		if err != nil {
55 56
 			return err
56 57
 		}
@@ -63,20 +64,21 @@ func (receiver Receiver) MmReceive(args Args, reply *Reply) error {
63 64
 		reply.CompleteLog = "received completely"
64 65
 	case PM:
65 66
 		var msg PublishMsg
66
-		err:=json.Unmarshal(args.JsonMsg,&msg)
67
+		err := json.Unmarshal(args.JsonMsg, &msg)
67 68
 		if err != nil {
68 69
 			return err
69 70
 		}
71
+
70 72
 		go func() {
71 73
 			_, err := receiver.moscato.Receive(msg)
72 74
 			if err != nil {
73 75
 
74 76
 			}
75 77
 		}()
76
-		reply.CompleteLog = "received completely"
78
+		reply.CompleteLog = "PM received completely"
77 79
 	case SM:
78 80
 		var msg SubscriptionMsg
79
-		err:=json.Unmarshal(args.JsonMsg,&msg)
81
+		err := json.Unmarshal(args.JsonMsg, &msg)
80 82
 		if err != nil {
81 83
 			return err
82 84
 		}
@@ -89,20 +91,20 @@ func (receiver Receiver) MmReceive(args Args, reply *Reply) error {
89 91
 		reply.CompleteLog = "received completely"
90 92
 	case RM:
91 93
 		var msg RegisterMsg
92
-		err:=json.Unmarshal(args.JsonMsg,&msg)
94
+		err := json.Unmarshal(args.JsonMsg, &msg)
93 95
 		if err != nil {
94 96
 			return err
95 97
 		}
96 98
 		go func() {
97 99
 			_, err := receiver.moscato.Receive(msg)
98 100
 			if err != nil {
99
-
101
+				fmt.Println(err)
100 102
 			}
101 103
 		}()
102
-		reply.CompleteLog = "received completely"
104
+		reply.CompleteLog = "RM received completely"
103 105
 	case WM:
104 106
 		var msg WithdrawMsg
105
-		err:=json.Unmarshal(args.JsonMsg,&msg)
107
+		err := json.Unmarshal(args.JsonMsg, &msg)
106 108
 		if err != nil {
107 109
 			return err
108 110
 		}
@@ -114,9 +116,9 @@ func (receiver Receiver) MmReceive(args Args, reply *Reply) error {
114 116
 		}()
115 117
 		reply.CompleteLog = "received completely"
116 118
 	default:
117
-		return errors.New("Message type Error: Not registered message type")
119
+		return errors.New("message type Error: Not registered message type")
118 120
 	}
119
-	reply.CompleteLog = "received completely"
121
+	//reply.CompleteLog = "received completely"
120 122
 	return nil
121 123
 }
122 124
 
@@ -132,18 +134,25 @@ func (moscato *Moscato) Receive(msg MsgUnit) (MsgUnit, error) {
132 134
 	case PM: //Publish msg
133 135
 		moscato.queue.push(msg.(PublishMsg))
134 136
 		log.Println("popped queue: ", moscato.queue.pop(false))
137
+		log.Println("PM received")
135 138
 
136 139
 	case SM: //Subscription msg
137 140
 		moscato.match_mng.add_subscription(msg.(*SubscriptionMsg))
138 141
 
139 142
 	case RM: //Register msg
140
-		//var newmsg RegisterMsg
141
-		var newmsg = msg.(*RegisterMsg)
142
-		newNode := MSnode{newmsg.From, newmsg.From}
143
-		moscato.ms_mng.add_microservice(newNode)
143
+		var newMsg RegisterMsg
144
+		newMsg = msg.(RegisterMsg)
145
+
146
+		newNode := MSNode{newMsg.From, newMsg.From}
147
+		moscato.MicroServiceManager.AddMicroservice(newNode)
148
+
149
+		addr, _ := moscato.MicroServiceManager.GetIpaddr(newMsg.From)
150
+		fmt.Println(addr)
151
+
152
+		log.Println("RM received")
144 153
 
145 154
 	case WM: //Withdraw msg
146
-		moscato.ms_mng.remove_microservice(msg.(*WithdrawMsg).From)
155
+		moscato.MicroServiceManager.RemoveMicroservice(msg.(*WithdrawMsg).From)
147 156
 
148 157
 	default:
149 158
 		return nil, errors.New("Message type Error: Not registered message type")
@@ -153,38 +162,44 @@ func (moscato *Moscato) Receive(msg MsgUnit) (MsgUnit, error) {
153 162
 }
154 163
 
155 164
 //Ms로 보낼때 쓸 함수
156
-func (moscato *Moscato)Send2MS(ipaddress string,msg MsgUnit){
157
-	client,err:=rpc.Dial("tcp",ipaddress+":8150")
158
-	if err!=nil{
165
+func (moscato *Moscato) Send2MS(ipaddress string, msg MsgUnit) {
166
+	client, err := rpc.Dial("tcp", ipaddress+":8150")
167
+	if err != nil {
159 168
 		fmt.Println(err)
160 169
 		return
161 170
 	}
162 171
 	defer client.Close()
163 172
 
164
-	reply:=new(Reply)
165
-	jmsg,_:=msg.ConvertToJson()
166
-	args:=Args{
173
+	reply := new(Reply)
174
+	jmsg, _ := msg.ConvertToJson()
175
+	args := Args{
167 176
 		JsonMsg: jmsg,
168 177
 		Kind:    msg.CheckType(),
169 178
 	}
170
-	err:=client.Call("receiver.MsReceive",args,reply)
171
-	if err !=nil{
179
+	err = client.Call("receiver.MsReceive", args, reply)
180
+	if err != nil {
172 181
 		fmt.Println(err)
173 182
 		return
174 183
 	}
175
-	fmt.Println(reply.CompleteLog)//잘 받았는지 확인 해줌
184
+	fmt.Println(reply.CompleteLog) //잘 받았는지 확인 해줌
176 185
 }
177 186
 
178 187
 func (moscato *Moscato) Run() {
179 188
 	//모스카토 구조체 변수 초기화
180 189
 	receiver := Receiver{moscato: moscato}
181
-	moscato.queue.queue_init()
190
+	err := moscato.queue.queue_init()
191
+	if err != nil {
192
+		fmt.Println(err)
193
+		return
194
+	}
195
+
196
+	moscato.MicroServiceManager = NewMStable()
182 197
 
183 198
 	//go routine -> matching 동작
184 199
 	go moscato.match_mng.matching(&moscato.queue)
185 200
 
186 201
 	//rpc 등록 -> Receive 함수
187
-	err := rpc.Register(receiver)
202
+	err = rpc.Register(receiver)
188 203
 	if err != nil {
189 204
 		log.Println(err)
190 205
 		return
@@ -196,7 +211,7 @@ func (moscato *Moscato) Run() {
196 211
 
197 212
 func Listen() {
198 213
 	//l, err := net.Listen("tcp", fmt.Sprintf(":%v", 8160))
199
-	l, err1 := net.Listen("tcp", ":8160")//MS로 부터 받는거
214
+	l, err1 := net.Listen("tcp", ":8160") //MS로 부터 받는거
200 215
 	//l2, err2 := net.Listen("tcp","0.0.0.0:8150")
201 216
 	if err1 != nil {
202 217
 		log.Fatal(fmt.Sprintf("Unable to listen on given port: %s", err1))

+ 22
- 12
src/broker/modules/manage.go Parādīt failu

@@ -1,26 +1,36 @@
1 1
 package modules
2 2
 
3 3
 //각 Microservice에 대한 정보 저장 노드
4
-type MSnode struct {
4
+type MSNode struct {
5 5
 	nodeName string
6 6
 	ipAddr   string
7 7
 }
8 8
 
9
-func (node *MSnode) Getname() string {
9
+func (node *MSNode) GetName() string {
10 10
 	return node.nodeName
11 11
 }
12
-func (node *MSnode) Getipaddr() string {
12
+func (node *MSNode) GetIpaddr() string {
13 13
 	return node.ipAddr
14 14
 }
15 15
 
16
+type NodeManager interface {
17
+	GetIpaddr(nodeName string) (string, bool)
18
+	AddMicroservice(node MSNode) bool
19
+	RemoveMicroservice(nodeName string) bool
20
+}
21
+
16 22
 //모든 Microservice정보 저장
17 23
 type MStable struct {
18
-	NodeTable map[string]MSnode
24
+	NodeTable map[string]MSNode
25
+}
26
+
27
+func NewMStable() *MStable {
28
+	return &MStable{NodeTable: make(map[string]MSNode)}
19 29
 }
20 30
 
21
-func (manager *MStable) getIpaddr(nodename string) (string, bool) {
31
+func (manager *MStable) GetIpaddr(nodeName string) (string, bool) {
22 32
 	//해당 이름의 노드이름이 존재하는지 확인
23
-	node, exists := manager.NodeTable[nodename]
33
+	node, exists := manager.NodeTable[nodeName]
24 34
 
25 35
 	//존재하지 않는 경우 nil리턴
26 36
 	if !exists {
@@ -30,24 +40,24 @@ func (manager *MStable) getIpaddr(nodename string) (string, bool) {
30 40
 	}
31 41
 }
32 42
 
33
-func (manager *MStable) add_microservice(node MSnode) bool {
43
+func (manager *MStable) AddMicroservice(node MSNode) bool {
34 44
 	//삽입 전 존재여부 확인
35
-	_, exists := manager.NodeTable[node.Getname()]
45
+	_, exists := manager.NodeTable[node.GetName()]
36 46
 
37 47
 	if exists {
38 48
 		return false
39 49
 	} else {
40
-		manager.NodeTable[node.Getname()] = node
50
+		manager.NodeTable[node.GetName()] = node
41 51
 		return true
42 52
 	}
43 53
 }
44 54
 
45
-func (manager *MStable) remove_microservice(nodename string) bool {
55
+func (manager *MStable) RemoveMicroservice(nodeName string) bool {
46 56
 	//삭제 전 존재여부 확인
47
-	_, exists := manager.NodeTable[nodename]
57
+	_, exists := manager.NodeTable[nodeName]
48 58
 
49 59
 	if exists {
50
-		delete(manager.NodeTable, nodename)
60
+		delete(manager.NodeTable, nodeName)
51 61
 		return true
52 62
 	} else {
53 63
 		return false

+ 28
- 29
src/broker/modules/matching.go Parādīt failu

@@ -8,14 +8,14 @@ import (
8 8
 )
9 9
 
10 10
 type match_manager struct {
11
-	match_count	int
11
+	match_count int
12 12
 }
13 13
 
14 14
 // Matching -> Return (list of IP addresses of matched subs, Pub Msg, error)
15 15
 func (match_mng *match_manager) Matching(queue *MsgQueue, sub_mng *sub_manager) ([]string, MsgUnit, error) {
16 16
 	msg := queue.pop(true)
17
-	topic := msg.(*PublishMsg).topic
18
-	value := msg.(*PublishMsg).value
17
+	topic := msg.(*PublishMsg).Topic
18
+	value := msg.(*PublishMsg).Value
19 19
 
20 20
 	// list
21 21
 	ret := make([]string, 0)
@@ -28,112 +28,111 @@ func (match_mng *match_manager) Matching(queue *MsgQueue, sub_mng *sub_manager)
28 28
 	topicPtr := sub_mng.list
29 29
 	pos := topicPtr.getTopicNodePos(topic)
30 30
 
31
-
32 31
 	// Don't Exist topicNode
33
-	if pos == nil{
34
-		return nil,nil,errors.New("Don't Exist topicNode")
32
+	if pos == nil {
33
+		return nil, nil, errors.New("Don't Exist topicNode")
35 34
 	}
36 35
 
37 36
 	// 2. Traverse all valueNode -> and Match
38 37
 	valPtr := pos.list.head
39
-	for valPtr != nil{
38
+	for valPtr != nil {
40 39
 		compare := Compare(valPtr.val, value)
41 40
 		if compare < 0 { // sub.val > pub.val
42
-		// single : { >, >= }
41
+			// single : { >, >= }
43 42
 
44 43
 			// (1) case : >
45
-			for i := 0; i < len(valPtr.single2sub_b); i++{
44
+			for i := 0; i < len(valPtr.single2sub_b); i++ {
46 45
 				sub := valPtr.single2sub_b[i]
47 46
 				ip := sub_mng.sub2ip[sub]
48 47
 				ret = append(ret, ip)
49 48
 			}
50 49
 
51 50
 			// (2) case : >=
52
-			for i := 0; i < len(valPtr.single2sub_eb); i++{
51
+			for i := 0; i < len(valPtr.single2sub_eb); i++ {
53 52
 				sub := valPtr.single2sub_eb[i]
54 53
 				ip := sub_mng.sub2ip[sub]
55 54
 				ret = append(ret, ip)
56 55
 			}
57
-		// range : { >, >= }
56
+			// range : { >, >= }
58 57
 
59 58
 			// (1) case : >
60
-			for i := 0; i < len(valPtr.range2sub_b); i++{
59
+			for i := 0; i < len(valPtr.range2sub_b); i++ {
61 60
 				sub := valPtr.range2sub_b[i]
62 61
 				big = append(big, sub)
63 62
 			}
64 63
 
65 64
 			// (2) case : >=
66
-			for i := 0; i < len(valPtr.range2sub_eb); i++{
65
+			for i := 0; i < len(valPtr.range2sub_eb); i++ {
67 66
 				sub := valPtr.range2sub_eb[i]
68 67
 				big = append(big, sub)
69 68
 			}
70 69
 
71 70
 		} else if compare > 0 { // sub.val < pub.val
72 71
 
73
-		// single : { <, <= }
72
+			// single : { <, <= }
74 73
 
75 74
 			// (1) case : <
76
-			for i := 0; i < len(valPtr.single2sub_s); i++{
75
+			for i := 0; i < len(valPtr.single2sub_s); i++ {
77 76
 				sub := valPtr.single2sub_s[i]
78 77
 				ip := sub_mng.sub2ip[sub]
79 78
 				ret = append(ret, ip)
80 79
 			}
81 80
 
82 81
 			// (2) case : <=
83
-			for i := 0; i < len(valPtr.single2sub_es); i++{
82
+			for i := 0; i < len(valPtr.single2sub_es); i++ {
84 83
 				sub := valPtr.single2sub_es[i]
85 84
 				ip := sub_mng.sub2ip[sub]
86 85
 				ret = append(ret, ip)
87 86
 			}
88 87
 
89
-		// range : { <, <= }
88
+			// range : { <, <= }
90 89
 
91 90
 			// (1) case : <
92
-			for i := 0; i < len(valPtr.range2sub_s); i++{
91
+			for i := 0; i < len(valPtr.range2sub_s); i++ {
93 92
 				sub := valPtr.range2sub_s[i]
94 93
 				small = append(small, sub)
95 94
 			}
96 95
 			// (2) case : <=
97
-			for i := 0; i < len(valPtr.range2sub_es); i++{
96
+			for i := 0; i < len(valPtr.range2sub_es); i++ {
98 97
 				sub := valPtr.range2sub_es[i]
99 98
 				small = append(small, sub)
100 99
 			}
101 100
 
102
-		} else{ // sub.val == pub.val
101
+		} else { // sub.val == pub.val
103 102
 
104
-		// single : { <=, >=, ==}
103
+			// single : { <=, >=, ==}
105 104
 
106 105
 			// (1) case : <=
107
-			for i := 0; i < len(valPtr.single2sub_es); i++{
106
+			for i := 0; i < len(valPtr.single2sub_es); i++ {
108 107
 				sub := valPtr.single2sub_es[i]
109 108
 				ip := sub_mng.sub2ip[sub]
110 109
 				ret = append(ret, ip)
111 110
 			}
112 111
 
113 112
 			// (2) case : >=
114
-			for i := 0; i < len(valPtr.single2sub_eb); i++{
113
+			for i := 0; i < len(valPtr.single2sub_eb); i++ {
115 114
 				sub := valPtr.single2sub_eb[i]
116 115
 				ip := sub_mng.sub2ip[sub]
117 116
 				ret = append(ret, ip)
118 117
 			}
119 118
 
120 119
 			// (3) case : ==
121
-			for i := 0; i < len(valPtr.single2sub_e); i++{
120
+			for i := 0; i < len(valPtr.single2sub_e); i++ {
122 121
 				sub := valPtr.single2sub_e[i]
123 122
 				ip := sub_mng.sub2ip[sub]
124 123
 				ret = append(ret, ip)
125 124
 			}
126 125
 
127
-		// range : { <=, >= }
126
+			// range : { <=, >= }
128 127
 
129 128
 			// (1) case : <=
130
-			for i := 0; i < len(valPtr.range2sub_es); i++{
129
+			for i := 0; i < len(valPtr.range2sub_es); i++ {
131 130
 				sub := valPtr.range2sub_es[i]
132 131
 				small = append(small, sub)
133 132
 			}
134 133
 
135 134
 			// (2) case : >=
136
-			for i := 0; i < len(valPtr.range2sub_eb); i++{
135
+			for i := 0; i < len(valPtr.range2sub_eb); i++ {
137 136
 				sub := valPtr.range2sub_eb[i]
138 137
 				big = append(big, sub)
139 138
 			}
@@ -145,7 +144,7 @@ func (match_mng *match_manager) Matching(queue *MsgQueue, sub_mng *sub_manager)
145 144
 	// Add the intersection IP address of two sets (large and small) to the return list
146 145
 	hash := intersect.Hash(small, big)
147 146
 	list := reflect.ValueOf(hash)
148
-	for i := 0;  i < list.Len(); i++{
147
+	for i := 0; i < list.Len(); i++ {
149 148
 		sub := list.Index(i).Interface().(int)
150 149
 		ip := sub_mng.sub2ip[sub]
151 150
 		ret = append(ret, ip)
@@ -154,4 +153,4 @@ func (match_mng *match_manager) Matching(queue *MsgQueue, sub_mng *sub_manager)
154 153
 	match_mng.match_count++
155 154
 
156 155
 	return ret, msg, nil
157
-}
156
+}

+ 1
- 1
src/broker/modules/message.go Parādīt failu

@@ -118,7 +118,7 @@ func (msg Message) CheckType() int {
118 118
 func NewKeyGenMsg(table *MStable) *KeyGenMsg {
119 119
 	m := &KeyGenMsg{}
120 120
 	for _, value := range table.NodeTable {
121
-		m.iptable = append(m.iptable, value.Getipaddr())
121
+		m.iptable = append(m.iptable, value.GetIpaddr())
122 122
 	}
123 123
 	return m
124 124
 }

+ 9
- 9
src/broker/modules/subscription.go Parādīt failu

@@ -8,10 +8,10 @@ type sub_manager struct {
8 8
 	list topicList
9 9
 
10 10
 	/* Manage sub# */
11
-	count_sub 	 int               // Subscription #
11
+	count_sub  int                  // Subscription #
12 12
 	emptylist  []int                // To administrate sub #
13 13
 	ip2sub     map[string][]int     // ip2sub[ip] = sub# ...
14
-	sub2ip	   map[int]string		// sub2ip[sub#] = ip
14
+	sub2ip     map[int]string       // sub2ip[sub#] = ip
15 15
 	sub2node   map[int][]*valueNode // sub2node[sub#] = node_addr ...
16 16
 	israngesub map[int]bool         // To manage when deleted
17 17
 }
@@ -42,14 +42,14 @@ func (manager *sub_manager) addSubscription(msg MsgUnit) error {
42 42
 
43 43
 	if len(manager.emptylist) == 0 {
44 44
 		subnumber = manager.count_sub
45
-		manager.ip2sub[msg.(*SubscriptionMsg).from] = append(manager.ip2sub[msg.(*SubscriptionMsg).from], manager.count_sub)
46
-		manager.sub2ip[subnumber] = msg.(*SubscriptionMsg).from
45
+		manager.ip2sub[msg.(*SubscriptionMsg).From] = append(manager.ip2sub[msg.(*SubscriptionMsg).From], manager.count_sub)
46
+		manager.sub2ip[subnumber] = msg.(*SubscriptionMsg).From
47 47
 		manager.count_sub++
48 48
 	} else {
49 49
 		subnumber := manager.emptylist[len(manager.emptylist)-1]
50 50
 		manager.emptylist = manager.emptylist[:len(manager.emptylist)-1]
51
-		manager.ip2sub[msg.(*SubscriptionMsg).from] = append(manager.ip2sub[msg.(*SubscriptionMsg).from], subnumber)
52
-		manager.sub2ip[subnumber] = msg.(*SubscriptionMsg).from
51
+		manager.ip2sub[msg.(*SubscriptionMsg).From] = append(manager.ip2sub[msg.(*SubscriptionMsg).From], subnumber)
52
+		manager.sub2ip[subnumber] = msg.(*SubscriptionMsg).From
53 53
 	}
54 54
 
55 55
 	nameptr := manager.list.head
@@ -83,7 +83,7 @@ func (manager *sub_manager) addSubscription(msg MsgUnit) error {
83 83
 		valptr.insertSub(operator[0], subnumber, true)
84 84
 		return nil // AddSubscription ok
85 85
 	} else {
86
-      
86
+
87 87
 		// For compound expressions bounded by '&&' and '||'
88 88
 		// (ex) { (234 < x) && (x <= 1293) } , { (234 < x) || ( x < 1293) }
89 89
 		logical_operator := operator[1]
@@ -214,6 +214,6 @@ func (manager *sub_manager) delete(from string) error {
214 214
 			}
215 215
 		}
216 216
 	}
217
-	manager.ip2sub[ip]=nil // Delete sub#s mapped to Ip address
217
+	manager.ip2sub[ip] = nil // Delete sub#s mapped to Ip address
218 218
 	return nil
219
-}
219
+}

Notiek ielāde…
Atcelt
Saglabāt