瀏覽代碼

Merge remote-tracking branch 'origin/master'

master
kidjung 4 年之前
父節點
當前提交
11a4291313
共有 2 個文件被更改,包括 134 次插入139 次删除
  1. 13
    19
      src/broker/modules/init.go
  2. 121
    120
      src/broker/modules/matching.go

+ 13
- 19
src/broker/modules/init.go 查看文件

@@ -4,35 +4,29 @@ import (
4 4
 	"encoding/json"
5 5
 	"errors"
6 6
 	"fmt"
7
-	"github.com/fatih/color"
7
+	_ "github.com/fatih/color"
8 8
 	"log"
9 9
 	"net"
10 10
 	"net/rpc"
11 11
 )
12 12
 
13 13
 // TODO: RM 처리와 init.go 전반적으로 DI 이용하여 구현 RM 키등록 부분 구현 예정
14
-//temporary type for matching manager
15
-//type match_manager struct{}
16
-
17
-func (match_mng *match_manager) matching(queue *MsgQueue) {
18
-	//msg := queue.pop(true)
19
-	//Implement here ~~
20
-}
21
-
22
-func (match_mng *match_manager) add_subscription(msg MsgUnit) {
23
-
24
-}
25
-
26
-//temporary type for secure(key) manager
27
-type secure_manager struct{}
28 14
 
29 15
 type Moscato struct {
30 16
 	queue               MsgQueue
17
+	SendQueue			chan myType
31 18
 	MicroServiceManager NodeManager `inject:""`
32 19
 	MatchingManager     match_manager
20
+	Subscriptionmanager	sub_manager
33 21
 	SecureManager       SecurityManager `inject:""`
34 22
 }
35 23
 
24
+type myType struct{
25
+	subList	[]string
26
+	pubMsg	MsgUnit
27
+	err		error
28
+}
29
+
36 30
 type Reply struct {
37 31
 	CompleteLog string
38 32
 }
@@ -144,7 +138,7 @@ func (moscato *Moscato) Receive(msg MsgUnit) (MsgUnit, error) {
144 138
 		moscato.queue.push(msg.(PublishMsg))
145 139
 
146 140
 	case SM: //Subscription msg
147
-		moscato.MatchingManager.add_subscription(msg.(*SubscriptionMsg))
141
+		moscato.Subscriptionmanager.addSubscription(msg.(*SubscriptionMsg))
148 142
 
149 143
 	case RM: //Register msg
150 144
 		log.Println("RM received")
@@ -217,7 +211,7 @@ func (moscato *Moscato) Run() {
217 211
 	}
218 212
 
219 213
 	//go routine -> matching 동작
220
-	go moscato.MatchingManager.matching(&moscato.queue)
214
+	go moscato.Matching()
221 215
 	go moscato.CheckQueue()
222 216
 
223 217
 	//rpc 등록 -> Receive 함수
@@ -228,7 +222,7 @@ func (moscato *Moscato) Run() {
228 222
 	}
229 223
 
230 224
 	go Listen()
231
-	color.Blue("initializing complete.")
225
+	//color.Blue("initializing complete.")
232 226
 	fmt.Scanln()
233 227
 }
234 228
 
@@ -254,4 +248,4 @@ func Listen() {
254 248
 		conn, _ := l2.Accept()
255 249
 		go rpc.ServeConn(conn)
256 250
 	}*/
257
-}
251
+}

+ 121
- 120
src/broker/modules/matching.go 查看文件

@@ -12,11 +12,11 @@ type match_manager struct {
12 12
 }
13 13
 
14 14
 // Matching -> Return (list of IP addresses of matched subs, Pub Msg, error)
15
-func (match_mng *match_manager) Matching(queue *MsgQueue, sub_mng *sub_manager) ([]string, MsgUnit, error) {
16
-	msg := queue.pop(true)
15
+func (moscato * Moscato) Matching() {
16
+	msg := moscato.queue.pop(true)
17 17
 	topic := msg.(*PublishMsg).Topic
18 18
 	value := msg.(*PublishMsg).Value
19
-
19
+	sub_mng := moscato.Subscriptionmanager
20 20
 	// list
21 21
 	ret := make([]string, 0)
22 22
 
@@ -30,127 +30,128 @@ func (match_mng *match_manager) Matching(queue *MsgQueue, sub_mng *sub_manager)
30 30
 
31 31
 	// Don't Exist topicNode
32 32
 	if pos == nil {
33
-		return nil, nil, errors.New("Don't Exist topicNode")
34
-	}
35
-
36
-	// 2. Traverse all valueNode -> and Match
37
-	valPtr := pos.list.head
38
-	for valPtr != nil {
39
-		compare := Compare(valPtr.val, value)
40
-		if compare < 0 { // sub.val > pub.val
41
-			// single : { >, >= }
42
-
43
-			// (1) case : >
44
-			for i := 0; i < len(valPtr.single2sub_b); i++ {
45
-				sub := valPtr.single2sub_b[i]
46
-				ip := sub_mng.sub2ip[sub]
47
-				ret = append(ret, ip)
48
-			}
49
-
50
-			// (2) case : >=
51
-			for i := 0; i < len(valPtr.single2sub_eb); i++ {
52
-				sub := valPtr.single2sub_eb[i]
53
-				ip := sub_mng.sub2ip[sub]
54
-				ret = append(ret, ip)
55
-			}
56
-			// range : { >, >= }
57
-
58
-			// (1) case : >
59
-			for i := 0; i < len(valPtr.range2sub_b); i++ {
60
-				sub := valPtr.range2sub_b[i]
61
-				big = append(big, sub)
62
-			}
63
-
64
-			// (2) case : >=
65
-			for i := 0; i < len(valPtr.range2sub_eb); i++ {
66
-				sub := valPtr.range2sub_eb[i]
67
-				big = append(big, sub)
68
-			}
69
-
70
-		} else if compare > 0 { // sub.val < pub.val
71
-
72
-			// single : { <, <= }
73
-
74
-			// (1) case : <
75
-			for i := 0; i < len(valPtr.single2sub_s); i++ {
76
-				sub := valPtr.single2sub_s[i]
77
-				ip := sub_mng.sub2ip[sub]
78
-				ret = append(ret, ip)
79
-			}
33
+		moscato.SendQueue <- myType{nil, msg, errors.New("Don't Exist Matching Topic")}
34
+	} else {
35
+		// 2. Traverse all valueNode -> and Match
36
+		valPtr := pos.list.head
37
+		for valPtr != nil {
38
+			compare := Compare(valPtr.val, value)
39
+			if compare < 0 { // sub.val > pub.val
40
+				// single : { >, >= }
41
+
42
+				// (1) case : >
43
+				for i := 0; i < len(valPtr.single2sub_b); i++ {
44
+					sub := valPtr.single2sub_b[i]
45
+					ip := sub_mng.sub2ip[sub]
46
+					ret = append(ret, ip)
47
+				}
48
+
49
+				// (2) case : >=
50
+				for i := 0; i < len(valPtr.single2sub_eb); i++ {
51
+					sub := valPtr.single2sub_eb[i]
52
+					ip := sub_mng.sub2ip[sub]
53
+					ret = append(ret, ip)
54
+				}
55
+				// range : { >, >= }
56
+
57
+				// (1) case : >
58
+				for i := 0; i < len(valPtr.range2sub_b); i++ {
59
+					sub := valPtr.range2sub_b[i]
60
+					big = append(big, sub)
61
+				}
62
+
63
+				// (2) case : >=
64
+				for i := 0; i < len(valPtr.range2sub_eb); i++ {
65
+					sub := valPtr.range2sub_eb[i]
66
+					big = append(big, sub)
67
+				}
68
+
69
+			} else if compare > 0 { // sub.val < pub.val
70
+
71
+				// single : { <, <= }
72
+
73
+				// (1) case : <
74
+				for i := 0; i < len(valPtr.single2sub_s); i++ {
75
+					sub := valPtr.single2sub_s[i]
76
+					ip := sub_mng.sub2ip[sub]
77
+					ret = append(ret, ip)
78
+				}
79
+
80
+				// (2) case : <=
81
+				for i := 0; i < len(valPtr.single2sub_es); i++ {
82
+					sub := valPtr.single2sub_es[i]
83
+					ip := sub_mng.sub2ip[sub]
84
+					ret = append(ret, ip)
85
+				}
86
+
87
+				// range : { <, <= }
88
+
89
+				// (1) case : <
90
+				for i := 0; i < len(valPtr.range2sub_s); i++ {
91
+					sub := valPtr.range2sub_s[i]
92
+					small = append(small, sub)
93
+				}
94
+				// (2) case : <=
95
+				for i := 0; i < len(valPtr.range2sub_es); i++ {
96
+					sub := valPtr.range2sub_es[i]
97
+					small = append(small, sub)
98
+				}
99
+
100
+			} else { // sub.val == pub.val
101
+
102
+				// single : { <=, >=, ==}
103
+
104
+				// (1) case : <=
105
+				for i := 0; i < len(valPtr.single2sub_es); i++ {
106
+					sub := valPtr.single2sub_es[i]
107
+					ip := sub_mng.sub2ip[sub]
108
+					ret = append(ret, ip)
109
+				}
110
+
111
+				// (2) case : >=
112
+				for i := 0; i < len(valPtr.single2sub_eb); i++ {
113
+					sub := valPtr.single2sub_eb[i]
114
+					ip := sub_mng.sub2ip[sub]
115
+					ret = append(ret, ip)
116
+				}
117
+
118
+				// (3) case : ==
119
+				for i := 0; i < len(valPtr.single2sub_e); i++ {
120
+					sub := valPtr.single2sub_e[i]
121
+					ip := sub_mng.sub2ip[sub]
122
+					ret = append(ret, ip)
123
+				}
124
+
125
+				// range : { <=, >= }
126
+
127
+				// (1) case : <=
128
+				for i := 0; i < len(valPtr.range2sub_es); i++ {
129
+					sub := valPtr.range2sub_es[i]
130
+					small = append(small, sub)
131
+				}
132
+
133
+				// (2) case : >=
134
+				for i := 0; i < len(valPtr.range2sub_eb); i++ {
135
+					sub := valPtr.range2sub_eb[i]
136
+					big = append(big, sub)
137
+				}
80 138
 
81
-			// (2) case : <=
82
-			for i := 0; i < len(valPtr.single2sub_es); i++ {
83
-				sub := valPtr.single2sub_es[i]
84
-				ip := sub_mng.sub2ip[sub]
85
-				ret = append(ret, ip)
86
-			}
87
-
88
-			// range : { <, <= }
89
-
90
-			// (1) case : <
91
-			for i := 0; i < len(valPtr.range2sub_s); i++ {
92
-				sub := valPtr.range2sub_s[i]
93
-				small = append(small, sub)
94
-			}
95
-			// (2) case : <=
96
-			for i := 0; i < len(valPtr.range2sub_es); i++ {
97
-				sub := valPtr.range2sub_es[i]
98
-				small = append(small, sub)
99
-			}
100
-
101
-		} else { // sub.val == pub.val
102
-
103
-			// single : { <=, >=, ==}
104
-
105
-			// (1) case : <=
106
-			for i := 0; i < len(valPtr.single2sub_es); i++ {
107
-				sub := valPtr.single2sub_es[i]
108
-				ip := sub_mng.sub2ip[sub]
109
-				ret = append(ret, ip)
110
-			}
111
-
112
-			// (2) case : >=
113
-			for i := 0; i < len(valPtr.single2sub_eb); i++ {
114
-				sub := valPtr.single2sub_eb[i]
115
-				ip := sub_mng.sub2ip[sub]
116
-				ret = append(ret, ip)
117
-			}
118
-
119
-			// (3) case : ==
120
-			for i := 0; i < len(valPtr.single2sub_e); i++ {
121
-				sub := valPtr.single2sub_e[i]
122
-				ip := sub_mng.sub2ip[sub]
123
-				ret = append(ret, ip)
124
-			}
125
-
126
-			// range : { <=, >= }
127
-
128
-			// (1) case : <=
129
-			for i := 0; i < len(valPtr.range2sub_es); i++ {
130
-				sub := valPtr.range2sub_es[i]
131
-				small = append(small, sub)
132 139
 			}
140
+			valPtr = valPtr.next
141
+		}
133 142
 
134
-			// (2) case : >=
135
-			for i := 0; i < len(valPtr.range2sub_eb); i++ {
136
-				sub := valPtr.range2sub_eb[i]
137
-				big = append(big, sub)
138
-			}
143
+		//fmt.Println("hi")
139 144
 
145
+		// Add the intersection IP address of two sets (large and small) to the return list
146
+		hash := intersect.Hash(small, big)
147
+		list := reflect.ValueOf(hash)
148
+		for i := 0; i < list.Len(); i++ {
149
+			sub := list.Index(i).Interface().(int)
150
+			ip := sub_mng.sub2ip[sub]
151
+			ret = append(ret, ip)
140 152
 		}
141
-		valPtr = valPtr.next
142
-	}
143 153
 
144
-	// Add the intersection IP address of two sets (large and small) to the return list
145
-	hash := intersect.Hash(small, big)
146
-	list := reflect.ValueOf(hash)
147
-	for i := 0; i < list.Len(); i++ {
148
-		sub := list.Index(i).Interface().(int)
149
-		ip := sub_mng.sub2ip[sub]
150
-		ret = append(ret, ip)
154
+		moscato.MatchingManager.match_count++
155
+		moscato.SendQueue <- myType{ret, msg, nil}
151 156
 	}
152
-
153
-	match_mng.match_count++
154
-
155
-	return ret, msg, nil
156
-}
157
+}

Loading…
取消
儲存