extra1563 преди 4 години
родител
ревизия
8c7d57fe9a
променени са 1 файла, в които са добавени 119 реда и са изтрити 287 реда
  1. 119
    287
      src/broker/modules/matching.go

+ 119
- 287
src/broker/modules/matching.go Целия файл

@@ -2,323 +2,155 @@ package modules
2 2
 
3 3
 import (
4 4
 	"errors"
5
+	_ "errors"
6
+	"github.com/juliangruber/go-intersect"
7
+	"reflect"
5 8
 )
6 9
 
7
-type sub_manager struct {
8
-	list topicList
9
-
10
-	/* Manage sub# */
11
-	count_sub  int                  // Subscription #
12
-	emptylist  []int                // To administrate sub #
13
-	ip2sub     map[string][]int     // ip2sub[ip] = sub# ...
14
-	sub2ip     map[int]string       // sub2ip[sub#] = ip
15
-	sub2node   map[int][]nodeInfo // sub2node[sub#] = node_addr ...
16
-	israngesub map[int]bool         // To manage when deleted
17
-}
18
-
19
-type nodeInfo struct{
20
-	valNodeList []*valueNode
21
-	topic []int64
22
-}
23
-
24
-func (manager *sub_manager) Initialize() {
25
-	// Some initialize
26
-	manager.ip2sub = make(map[string][]int)
27
-	manager.sub2ip = make(map[int]string)
28
-	manager.sub2node = make(map[int][]nodeInfo)
29
-	manager.israngesub = make(map[int]bool)
30
-}
31
-
32
-func newSubmng() *sub_manager {
33
-	subMng := &sub_manager{}
34
-	subMng.Initialize()
35
-	return subMng
10
+type match_manager struct {
11
+	match_count int
36 12
 }
37 13
 
38
-func (manager *sub_manager) isDuplicated(msg MsgUnit) bool{
39
-	from := msg.(SubscriptionMsg).From
40
-	topic := msg.(SubscriptionMsg).Topic
41
-	value := msg.(SubscriptionMsg).Value
42
-	operator := msg.(SubscriptionMsg).Operator
43
-	subList := manager.ip2sub[from]
44
-	canFind := false
14
+// Matching -> Return (list of IP addresses of matched subs, Pub Msg, error)
15
+func (moscato *Moscato) Matching(msg MsgUnit) {
16
+	//msg := moscato.queue.pop(true)
17
+	topic := msg.(PublishMsg).Topic
18
+	value := msg.(PublishMsg).Value
19
+	sub_mng := moscato.SubscriptionManager
45 20
 
46
-	for i := 0; i < len(subList) && canFind == false ; i++{
47
-		sub := subList[i]
48
-		nodeinfoList := manager.sub2node[sub]
21
+	// listß
22
+	ret := make([]string, 0)
49 23
 
50
-		for j := 0; j < len(nodeinfoList); j++{
51
-			node := nodeinfoList[j]
24
+	// list for matched range subscriptions
25
+	big := make([]int, 0)
26
+	small := make([]int, 0)
52 27
 
53
-			if Compare(node.topic, topic) == 0 {
54
-				cnt := 0
55
-				if len(operator) == 1{
56
-					valPtr := node.valNodeList[0]
57
-					if Compare(valPtr.val ,value) == 0{
58
-						op := operator[0]
59
-						cnt += findOperatorList(op, valPtr, sub, true)
60
-					}
61
-				} else { // value가 숫자일 때
62
-					leftop := operator[0]
63
-					logicalop := operator[1]
64
-					rightop := operator[2]
28
+	// 1. Find (topicNode[Topic] == msg.Topic) Node
29
+	topicPtr := sub_mng.list
30
+	pos := topicPtr.getTopicNodePos(topic)
65 31
 
66
-					leftValuePtr := node.valNodeList[0]
67
-					rightValuePtr := node.valNodeList[1]
32
+	// Don't Exist topicNode
33
+	if pos == nil {
34
+		moscato.SendQueue <- myType{nil, msg, errors.New("Don't Exist Matching Topic")}
35
+	} else {
36
+		// 2. Traverse all valueNode -> and Match
37
+		valPtr := pos.list.head
38
+		for valPtr != nil {
39
+			compare := Compare(valPtr.val, value)
40
+			if compare < 0 { // sub.val > pub.val
41
+				// single : { >, >= }
42
+
43
+				// (1) case : >
44
+				for i := 0; i < len(valPtr.single2sub_b); i++ {
45
+					sub := valPtr.single2sub_b[i]
46
+					ip := sub_mng.sub2ip[sub]
47
+					ret = append(ret, ip)
48
+				}
68 49
 
69
-					nodeValList := []int64{leftValuePtr.val[0], rightValuePtr.val[0]}
50
+				// (2) case : >=
51
+				for i := 0; i < len(valPtr.single2sub_eb); i++ {
52
+					sub := valPtr.single2sub_eb[i]
53
+					ip := sub_mng.sub2ip[sub]
54
+					ret = append(ret, ip)
55
+				}
56
+				// range : { >, >= }
70 57
 
71
-					if Compare(nodeValList, value) == 0 {
72
-						if logicalop == "&&" {
73
-							cnt += findOperatorList(leftop, leftValuePtr, sub, false)
74
-							cnt += findOperatorList(rightop, rightValuePtr, sub, false)
75
-						} else {
76
-							cnt += findOperatorList(leftop, leftValuePtr, sub, true)
77
-							cnt += findOperatorList(rightop, rightValuePtr, sub, true)
78
-						}
79
-					}
58
+				// (1) case : >
59
+				for i := 0; i < len(valPtr.range2sub_b); i++ {
60
+					sub := valPtr.range2sub_b[i]
61
+					big = append(big, sub)
80 62
 				}
81
-				if cnt == len(node.valNodeList){
82
-					canFind = true
83
-					break
63
+
64
+				// (2) case : >=
65
+				for i := 0; i < len(valPtr.range2sub_eb); i++ {
66
+					sub := valPtr.range2sub_eb[i]
67
+					big = append(big, sub)
84 68
 				}
85
-			}
86
-		}
87
-	}
88
-	return canFind
89
-}
90 69
 
91
-// Returns if sub exists in the operator list in the same node as op.
92
-func findOperatorList(op string, node *valueNode, sub int, issingle bool) int {
93
-	ret := -1
94
-	if issingle == true {
95
-		switch op {
96
-		case "<":
97
-			ret = findSub(node.single2sub_s, sub)
98
-		case "<=":
99
-			ret = findSub(node.single2sub_es, sub)
100
-		case ">":
101
-			ret = findSub(node.single2sub_b, sub)
102
-		case ">=":
103
-			ret = findSub(node.single2sub_eb, sub)
104
-		case "==":
105
-			ret = findSub(node.single2sub_e, sub)
106
-		}
107
-	} else {
108
-		switch op {
109
-		case "<":
110
-			ret = findSub(node.range2sub_s, sub)
111
-		case "<=":
112
-			ret = findSub(node.range2sub_es, sub)
113
-		case ">":
114
-			ret = findSub(node.range2sub_b, sub)
115
-		case ">=":
116
-			ret = findSub(node.range2sub_eb, sub)
117
-		}
118
-	}
119
-	if ret < 0{
120
-		return 0
121
-	} else{
122
-		return 1
123
-	}
124
-}
70
+			} else if compare > 0 { // sub.val < pub.val
125 71
 
126
-//	### To Insert sub#
127
-func (manager *sub_manager) addSubscription(msg MsgUnit) error {
72
+				// single : { <, <= }
128 73
 
129
-	topic := msg.(SubscriptionMsg).Topic
130
-	value := msg.(SubscriptionMsg).Value
131
-	operator := msg.(SubscriptionMsg).Operator
132
-	subnumber := 0
74
+				// (1) case : <
75
+				for i := 0; i < len(valPtr.single2sub_s); i++ {
76
+					sub := valPtr.single2sub_s[i]
77
+					ip := sub_mng.sub2ip[sub]
78
+					ret = append(ret, ip)
79
+				}
133 80
 
134
-	// 0.  Check the same ip and sub information
135
-	if manager.isDuplicated(msg) == true {
136
-		return errors.New("Duplicater Subscription")
137
-	}
81
+				// (2) case : <=
82
+				for i := 0; i < len(valPtr.single2sub_es); i++ {
83
+					sub := valPtr.single2sub_es[i]
84
+					ip := sub_mng.sub2ip[sub]
85
+					ret = append(ret, ip)
86
+				}
138 87
 
139
-	// 1. Mapping incoming IP address to sub #
140
-	if len(manager.emptylist) == 0 {
141
-		subnumber = manager.count_sub
142
-		manager.ip2sub[msg.(SubscriptionMsg).From] = append(manager.ip2sub[msg.(SubscriptionMsg).From], manager.count_sub)
143
-		manager.sub2ip[subnumber] = msg.(SubscriptionMsg).From
144
-		manager.count_sub++
145
-	} else {
146
-		subnumber := manager.emptylist[len(manager.emptylist)-1]
147
-		manager.emptylist = manager.emptylist[:len(manager.emptylist)-1]
148
-		manager.ip2sub[msg.(SubscriptionMsg).From] = append(manager.ip2sub[msg.(SubscriptionMsg).From], subnumber)
149
-		manager.sub2ip[subnumber] = msg.(SubscriptionMsg).From
150
-	}
88
+				// range : { <, <= }
151 89
 
152
-	nameptr := manager.list.head
153
-	findOk := false
90
+				// (1) case : <
91
+				for i := 0; i < len(valPtr.range2sub_s); i++ {
92
+					sub := valPtr.range2sub_s[i]
93
+					small = append(small, sub)
94
+				}
95
+				// (2) case : <=
96
+				for i := 0; i < len(valPtr.range2sub_es); i++ {
97
+					sub := valPtr.range2sub_es[i]
98
+					small = append(small, sub)
99
+				}
154 100
 
155
-	// * 2. Add Subscription
101
+			} else { // sub.val == pub.val
156 102
 
157
-	// Find name in namelist, add if not found
158
-	for nameptr != nil {
159
-		if Compare(nameptr.topic, topic) == 0 {
160
-			findOk = true
161
-			break
162
-		}
163
-		nameptr = nameptr.next
164
-	}
103
+				// single : { <=, >=, ==}
165 104
 
105
+				// (1) case : <=
106
+				for i := 0; i < len(valPtr.single2sub_es); i++ {
107
+					sub := valPtr.single2sub_es[i]
108
+					ip := sub_mng.sub2ip[sub]
109
+					ret = append(ret, ip)
110
+				}
166 111
 
167
-	if !findOk {
168
-		manager.list.addTopicNode(topic)
169
-		nameptr = manager.list.tail
170
-	}
112
+				// (2) case : >=
113
+				for i := 0; i < len(valPtr.single2sub_eb); i++ {
114
+					sub := valPtr.single2sub_eb[i]
115
+					ip := sub_mng.sub2ip[sub]
116
+					ret = append(ret, ip)
117
+				}
171 118
 
172
-	var addValNodeList []*valueNode
119
+				// (3) case : ==
120
+				for i := 0; i < len(valPtr.single2sub_e); i++ {
121
+					sub := valPtr.single2sub_e[i]
122
+					ip := sub_mng.sub2ip[sub]
123
+					ret = append(ret, ip)
124
+				}
173 125
 
174
-	// Add Value to list[name]
175
-	if len(operator) == 1 { // if single expression
176
-		valptr := nameptr.list.getValueNodePos(value)
177
-		if valptr == nil {
178
-			nameptr.list.addValueNode(value)
179
-			valptr = nameptr.list.tail
180
-		}
181
-		addValNodeList = append(addValNodeList, valptr)
182
-		manager.sub2node[subnumber] = append(manager.sub2node[subnumber], nodeInfo{addValNodeList, topic})
183
-		valptr.insertSub(operator[0], subnumber, true)
184
-		return nil // AddSubscription ok
185
-	} else {
186
-		// {">" "&&" "<"}
187
-		// For compound expressions bounded by '&&' and '||'
188
-		// (ex) { (234 < x) && (x <= 1293) } , { (234 < x) || ( x < 1293) }
126
+				// range : { <=, >= }
189 127
 
190
-		logical_operator := operator[2]
128
+				// (1) case : <=
129
+				for i := 0; i < len(valPtr.range2sub_es); i++ {
130
+					sub := valPtr.range2sub_es[i]
131
+					small = append(small, sub)
132
+				}
191 133
 
192
-		// Find ValueNode = (namelist[name].list.val == Value)
193
-		valptr1 := nameptr.list.getValueNodePos([]int64{value[0]})
194
-		valptr2 := nameptr.list.getValueNodePos([]int64{value[1]})
134
+				// (2) case : >=
135
+				for i := 0; i < len(valPtr.range2sub_eb); i++ {
136
+					sub := valPtr.range2sub_eb[i]
137
+					big = append(big, sub)
138
+				}
195 139
 
196
-		if valptr1 == nil {
197
-			nameptr.list.addValueNode([]int64{value[0]})
198
-			valptr1 = nameptr.list.tail
199
-		}
200
-		if valptr2 == nil {
201
-			nameptr.list.addValueNode([]int64{value[1]})
202
-			valptr2 = nameptr.list.tail
140
+			}
141
+			valPtr = valPtr.next
203 142
 		}
204
-		addValNodeList = append(addValNodeList, valptr1)
205
-		addValNodeList = append(addValNodeList, valptr2)
206
-		manager.sub2node[subnumber] = append(manager.sub2node[subnumber], nodeInfo{addValNodeList, topic})
207
-		manager.sub2node[subnumber] = append(manager.sub2node[subnumber], nodeInfo{addValNodeList, topic})
208 143
 
209
-		if logical_operator == "&&" {
210
-			// If they are enclosed in '&&' -> Insert Value to range_operator_list
211
-			manager.israngesub[subnumber] = true
212
-			valptr1.insertSub(operator[0], subnumber, false)
213
-			valptr2.insertSub(operator[2], subnumber, false)
214
-
215
-		} else {
216
-			// if they are enclosed in '||' -> Insert Value to single_operator_list
217
-			valptr1.insertSub(operator[0], subnumber, true)
218
-			valptr2.insertSub(operator[2], subnumber, true)
144
+		// Add the intersection IP address of two sets (large and small) to the return list
145
+		hash := intersect.Hash(small, big)
146
+		list := reflect.ValueOf(hash)
147
+		for i := 0; i < list.Len(); i++ {
148
+			sub := list.Index(i).Interface().(int)
149
+			ip := sub_mng.sub2ip[sub]
150
+			ret = append(ret, ip)
219 151
 		}
220 152
 
221
-		return nil // addSubscription ok
222
-	}
223
-	return errors.New("Can't addSubscription")
224
-}
225
-
226
-// To delete subscription
227
-func (manager *sub_manager) delete(from string) error {
228
-
229
-	ip := from
230
-	cand := manager.ip2sub[ip]
231
-
232
-	for i := 0; i < len(cand); i++ {
233
-
234
-		sub := cand[i]
235
-		for j := 0; j < len(manager.sub2node[sub]); j++ {
236
-			nodeinfo := manager.sub2node[sub][j]
237
-			node := nodeinfo.valNodeList
238
-			if manager.israngesub[sub] {
239
-				for k := 0; k < len(node); k++ {
240
-					pos := findSub(node[k].range2sub_s, sub)
241
-					if pos != -1 {
242
-						node[k].range2sub_s = remove(node[k].range2sub_s, pos)
243
-						manager.emptylist = append(manager.emptylist, sub)
244
-					}
245
-
246
-					pos = findSub(node[k].range2sub_es, sub)
247
-					if pos != -1 {
248
-						node[k].range2sub_es = remove(node[k].range2sub_es, pos)
249
-						manager.emptylist = append(manager.emptylist, sub)
250
-					}
251
-
252
-					pos = findSub(node[k].range2sub_b, sub)
253
-					if pos != -1 {
254
-						node[k].range2sub_b = remove(node[k].range2sub_b, pos)
255
-						manager.emptylist = append(manager.emptylist, sub)
256
-					}
257
-
258
-					pos = findSub(node[k].range2sub_eb, sub)
259
-					if pos != -1 {
260
-						node[k].range2sub_eb = remove(node[k].range2sub_eb, pos)
261
-						manager.emptylist = append(manager.emptylist, sub)
262
-					}
263
-
264
-					isempty := node[k].isEmpty()
265
-
266
-					// Delete if Value Node is empty
267
-					if isempty && node[k] != nil {
268
-						prev_node := node[k].prev
269
-						next_node := node[k].next
270
-
271
-						prev_node.next = node[k].next
272
-						next_node.prev = node[k].prev
273
-					}
274
-				}
275
-			} else {
276
-				for k := 0; k < len(node); k++ {
277
-					pos := findSub(node[k].single2sub_s, sub)
278
-					if pos != -1 {
279
-						node[k].single2sub_s = remove(node[k].single2sub_s, pos)
280
-						manager.emptylist = append(manager.emptylist, sub)
281
-					}
282
-
283
-					pos = findSub(node[k].single2sub_es, sub)
284
-					if pos != -1 {
285
-						node[k].single2sub_es = remove(node[k].single2sub_es, pos)
286
-						manager.emptylist = append(manager.emptylist, sub)
287
-					}
288
-
289
-					pos = findSub(node[k].single2sub_b, sub)
290
-					if pos != -1 {
291
-						node[k].single2sub_b = remove(node[k].single2sub_b, pos)
292
-						manager.emptylist = append(manager.emptylist, sub)
293
-					}
294
-
295
-					pos = findSub(node[k].single2sub_eb, sub)
296
-					if pos != -1 {
297
-						node[k].single2sub_eb = remove(node[k].single2sub_eb, pos)
298
-						manager.emptylist = append(manager.emptylist, sub)
299
-					}
300
-
301
-					pos = findSub(node[k].single2sub_e, sub)
302
-					if pos != -1 {
303
-						node[k].single2sub_e = remove(node[k].single2sub_e, pos)
304
-						manager.emptylist = append(manager.emptylist, sub)
305
-					}
306
-
307
-					isempty := node[k].isEmpty()
308
-
309
-					// Delete if Value Node is empty
310
-					if isempty && node[k] != nil {
311
-						prevNode := node[k].prev
312
-						nextNode := node[k].next
313
-
314
-						prevNode.next = node[k].next
315
-						nextNode.prev = node[k].prev
316
-					}
317
-				}
318
-			}
319
-		}
320
-		manager.ip2sub[ip] = nil // Delete sub#s mapped to Ip address
321
-		return nil
153
+		moscato.MatchingManager.match_count++
154
+		moscato.SendQueue <- myType{ret, msg, nil}
322 155
 	}
323
-	return errors.New("Don't Exist Subscription to delete")
324 156
 }

Loading…
Отказ
Запис