extra1563 4 лет назад
Родитель
Сommit
6bc7414c26
1 измененных файлов: 37 добавлений и 22 удалений
  1. 37
    22
      src/broker/modules/matching.go

+ 37
- 22
src/broker/modules/matching.go Просмотреть файл

8
 )
8
 )
9
 
9
 
10
 type match_manager struct {
10
 type match_manager struct {
11
-	match_count int
11
+	match_count int // Number of successful matches
12
 }
12
 }
13
 
13
 
14
-// Matching -> Return (list of IP addresses of matched subs, Pub Msg, error)
14
+// Matching operation method
15
+// 1. Traverse TopicList for match (msg.topic == topicNode.topic)
16
+// 2. Traverse TopicNode.ValueList for match (msg.value <operator> valueNode.value)
17
+// 3. Add ipaddr to ipList when successful matching
18
+// 4. Add ipaddr to ipList when range matching
19
+// 5. Delete Duplicated Ipaddrs
20
+// @Return (list of IP addresses of matched subs, Pub Msg, error)
15
 func (moscato *Moscato) Matching(msg MsgUnit) {
21
 func (moscato *Moscato) Matching(msg MsgUnit) {
16
-	//msg := moscato.queue.pop(true)
22
+
17
 	topic := msg.(PublishMsg).Topic
23
 	topic := msg.(PublishMsg).Topic
18
 	value := msg.(PublishMsg).Value
24
 	value := msg.(PublishMsg).Value
19
 	sub_mng := moscato.SubscriptionManager
25
 	sub_mng := moscato.SubscriptionManager
20
 
26
 
21
-	// listß
22
-	ret := make([]string, 0)
27
+	// iplist for return
28
+	ipList := make([]string, 0)
23
 
29
 
24
 	// list for matched range subscriptions
30
 	// list for matched range subscriptions
25
 	big := make([]int, 0)
31
 	big := make([]int, 0)
26
 	small := make([]int, 0)
32
 	small := make([]int, 0)
27
 
33
 
28
-	// 1. Find (topicNode[Topic] == msg.Topic) Node
34
+	// Find (topicNode[Topic] == msg.Topic) Node
29
 	topicPtr := sub_mng.list
35
 	topicPtr := sub_mng.list
30
 	pos := topicPtr.getTopicNodePos(topic)
36
 	pos := topicPtr.getTopicNodePos(topic)
31
 
37
 
33
 	if pos == nil {
39
 	if pos == nil {
34
 		moscato.SendQueue <- myType{nil, msg, errors.New("Don't Exist Matching Topic")}
40
 		moscato.SendQueue <- myType{nil, msg, errors.New("Don't Exist Matching Topic")}
35
 	} else {
41
 	} else {
36
-		// 2. Traverse all valueNode -> and Match
42
+		// Traverse all valueNode -> and Match
37
 		valPtr := pos.list.head
43
 		valPtr := pos.list.head
38
 		for valPtr != nil {
44
 		for valPtr != nil {
39
 			compare := Compare(valPtr.val, value)
45
 			compare := Compare(valPtr.val, value)
44
 				for i := 0; i < len(valPtr.single2sub_b); i++ {
50
 				for i := 0; i < len(valPtr.single2sub_b); i++ {
45
 					sub := valPtr.single2sub_b[i]
51
 					sub := valPtr.single2sub_b[i]
46
 					ip := sub_mng.sub2ip[sub]
52
 					ip := sub_mng.sub2ip[sub]
47
-					ret = append(ret, ip)
53
+					ipList = append(ipList, ip)
48
 				}
54
 				}
49
 
55
 
50
 				// (2) case : >=
56
 				// (2) case : >=
51
 				for i := 0; i < len(valPtr.single2sub_eb); i++ {
57
 				for i := 0; i < len(valPtr.single2sub_eb); i++ {
52
 					sub := valPtr.single2sub_eb[i]
58
 					sub := valPtr.single2sub_eb[i]
53
 					ip := sub_mng.sub2ip[sub]
59
 					ip := sub_mng.sub2ip[sub]
54
-					ret = append(ret, ip)
60
+					ipList = append(ipList, ip)
55
 				}
61
 				}
56
 				// range : { >, >= }
62
 				// range : { >, >= }
57
 
63
 
75
 				for i := 0; i < len(valPtr.single2sub_s); i++ {
81
 				for i := 0; i < len(valPtr.single2sub_s); i++ {
76
 					sub := valPtr.single2sub_s[i]
82
 					sub := valPtr.single2sub_s[i]
77
 					ip := sub_mng.sub2ip[sub]
83
 					ip := sub_mng.sub2ip[sub]
78
-					ret = append(ret, ip)
84
+					ipList = append(ipList, ip)
79
 				}
85
 				}
80
 
86
 
81
 				// (2) case : <=
87
 				// (2) case : <=
82
 				for i := 0; i < len(valPtr.single2sub_es); i++ {
88
 				for i := 0; i < len(valPtr.single2sub_es); i++ {
83
 					sub := valPtr.single2sub_es[i]
89
 					sub := valPtr.single2sub_es[i]
84
 					ip := sub_mng.sub2ip[sub]
90
 					ip := sub_mng.sub2ip[sub]
85
-					ret = append(ret, ip)
91
+					ipList = append(ipList, ip)
86
 				}
92
 				}
87
 
93
 
88
 				// range : { <, <= }
94
 				// range : { <, <= }
106
 				for i := 0; i < len(valPtr.single2sub_es); i++ {
112
 				for i := 0; i < len(valPtr.single2sub_es); i++ {
107
 					sub := valPtr.single2sub_es[i]
113
 					sub := valPtr.single2sub_es[i]
108
 					ip := sub_mng.sub2ip[sub]
114
 					ip := sub_mng.sub2ip[sub]
109
-					ret = append(ret, ip)
115
+					ipList = append(ipList, ip)
110
 				}
116
 				}
111
 
117
 
112
 				// (2) case : >=
118
 				// (2) case : >=
113
 				for i := 0; i < len(valPtr.single2sub_eb); i++ {
119
 				for i := 0; i < len(valPtr.single2sub_eb); i++ {
114
 					sub := valPtr.single2sub_eb[i]
120
 					sub := valPtr.single2sub_eb[i]
115
 					ip := sub_mng.sub2ip[sub]
121
 					ip := sub_mng.sub2ip[sub]
116
-					ret = append(ret, ip)
122
+					ipList = append(ipList, ip)
117
 				}
123
 				}
118
 
124
 
119
 				// (3) case : ==
125
 				// (3) case : ==
120
 				for i := 0; i < len(valPtr.single2sub_e); i++ {
126
 				for i := 0; i < len(valPtr.single2sub_e); i++ {
121
 					sub := valPtr.single2sub_e[i]
127
 					sub := valPtr.single2sub_e[i]
122
 					ip := sub_mng.sub2ip[sub]
128
 					ip := sub_mng.sub2ip[sub]
123
-					ret = append(ret, ip)
129
+					ipList = append(ipList, ip)
124
 				}
130
 				}
125
 
131
 
126
 				// range : { <=, >= }
132
 				// range : { <=, >= }
142
 		}
148
 		}
143
 
149
 
144
 		// Add the intersection IP address of two sets (large and small) to the return list
150
 		// Add the intersection IP address of two sets (large and small) to the return list
145
-		hash := intersect.Hash(small, big)
146
-		list := reflect.ValueOf(hash)
151
+		intersectSubnumbers := intersect.Hash(small, big)
152
+		list := reflect.ValueOf(intersectSubnumbers)
147
 		for i := 0; i < list.Len(); i++ {
153
 		for i := 0; i < list.Len(); i++ {
148
 			sub := list.Index(i).Interface().(int)
154
 			sub := list.Index(i).Interface().(int)
149
 			ip := sub_mng.sub2ip[sub]
155
 			ip := sub_mng.sub2ip[sub]
150
-			ret = append(ret, ip)
156
+			ipList = append(ipList, ip)
151
 		}
157
 		}
152
-		//fmt.Println("big = ", big)
153
-		//fmt.Println("small = ", small)
154
-		//fmt.Println("ret = ", ret)
155
 		moscato.MatchingManager.match_count++
158
 		moscato.MatchingManager.match_count++
156
-		moscato.SendQueue <- myType{ret, msg, nil}
159
+
160
+		// To delete Duplicated Ipaddr
161
+		keys := make(map[string]bool)
162
+		retIpList := []string{}
163
+		for _, val := range ipList {
164
+			if _, saveVal := keys[val]; !saveVal{
165
+				keys[val] = true
166
+				retIpList = append(retIpList, val)
167
+			}
168
+		}
169
+
170
+		// Return {IpaddressList, PubMsg, ErrorMsg} to SendQueue
171
+		moscato.SendQueue <- myType{retIpList, msg, nil}
157
 	}
172
 	}
158
-}
173
+}

Загрузка…
Отмена
Сохранить