Selaa lähdekoodia

[feat] Add method to check same ip and sub information

master
extra1563 4 vuotta sitten
vanhempi
commit
afcfb65651
1 muutettua tiedostoa jossa 287 lisäystä ja 119 poistoa
  1. 287
    119
      src/broker/modules/matching.go

+ 287
- 119
src/broker/modules/matching.go Näytä tiedosto

@@ -2,155 +2,323 @@ package modules
2 2
 
3 3
 import (
4 4
 	"errors"
5
-	_ "errors"
6
-	"github.com/juliangruber/go-intersect"
7
-	"reflect"
8 5
 )
9 6
 
10
-type match_manager struct {
11
-	match_count int
7
+type sub_manager struct {
8
+	list topicList
9
+
10
+	/* Manage sub# */
11
+	count_sub  int                  // Subscription #
12
+	emptylist  []int                // To administrate sub #
13
+	ip2sub     map[string][]int     // ip2sub[ip] = sub# ...
14
+	sub2ip     map[int]string       // sub2ip[sub#] = ip
15
+	sub2node   map[int][]nodeInfo // sub2node[sub#] = node_addr ...
16
+	israngesub map[int]bool         // To manage when deleted
12 17
 }
13 18
 
14
-// Matching -> Return (list of IP addresses of matched subs, Pub Msg, error)
15
-func (moscato *Moscato) Matching(msg MsgUnit) {
16
-	//msg := moscato.queue.pop(true)
17
-	topic := msg.(PublishMsg).Topic
18
-	value := msg.(PublishMsg).Value
19
-	sub_mng := moscato.SubscriptionManager
19
+type nodeInfo struct{
20
+	valNodeList []*valueNode
21
+	topic []int64
22
+}
20 23
 
21
-	// listß
22
-	ret := make([]string, 0)
24
+func (manager *sub_manager) Initialize() {
25
+	// Some initialize
26
+	manager.ip2sub = make(map[string][]int)
27
+	manager.sub2ip = make(map[int]string)
28
+	manager.sub2node = make(map[int][]nodeInfo)
29
+	manager.israngesub = make(map[int]bool)
30
+}
23 31
 
24
-	// list for matched range subscriptions
25
-	big := make([]int, 0)
26
-	small := make([]int, 0)
32
+func newSubmng() *sub_manager {
33
+	subMng := &sub_manager{}
34
+	subMng.Initialize()
35
+	return subMng
36
+}
27 37
 
28
-	// 1. Find (topicNode[Topic] == msg.Topic) Node
29
-	topicPtr := sub_mng.list
30
-	pos := topicPtr.getTopicNodePos(topic)
38
+func (manager *sub_manager) isDuplicated(msg MsgUnit) bool{
39
+	from := msg.(SubscriptionMsg).From
40
+	topic := msg.(SubscriptionMsg).Topic
41
+	value := msg.(SubscriptionMsg).Value
42
+	operator := msg.(SubscriptionMsg).Operator
43
+	subList := manager.ip2sub[from]
44
+	canFind := false
31 45
 
32
-	// Don't Exist topicNode
33
-	if pos == nil {
34
-		moscato.SendQueue <- myType{nil, msg, errors.New("Don't Exist Matching Topic")}
35
-	} else {
36
-		// 2. Traverse all valueNode -> and Match
37
-		valPtr := pos.list.head
38
-		for valPtr != nil {
39
-			compare := Compare(valPtr.val, value)
40
-			if compare < 0 { // sub.val > pub.val
41
-				// single : { >, >= }
42
-
43
-				// (1) case : >
44
-				for i := 0; i < len(valPtr.single2sub_b); i++ {
45
-					sub := valPtr.single2sub_b[i]
46
-					ip := sub_mng.sub2ip[sub]
47
-					ret = append(ret, ip)
48
-				}
46
+	for i := 0; i < len(subList) && canFind == false ; i++{
47
+		sub := subList[i]
48
+		nodeinfoList := manager.sub2node[sub]
49 49
 
50
-				// (2) case : >=
51
-				for i := 0; i < len(valPtr.single2sub_eb); i++ {
52
-					sub := valPtr.single2sub_eb[i]
53
-					ip := sub_mng.sub2ip[sub]
54
-					ret = append(ret, ip)
55
-				}
56
-				// range : { >, >= }
50
+		for j := 0; j < len(nodeinfoList); j++{
51
+			node := nodeinfoList[j]
57 52
 
58
-				// (1) case : >
59
-				for i := 0; i < len(valPtr.range2sub_b); i++ {
60
-					sub := valPtr.range2sub_b[i]
61
-					big = append(big, sub)
62
-				}
53
+			if Compare(node.topic, topic) == 0 {
54
+				cnt := 0
55
+				if len(operator) == 1{
56
+					valPtr := node.valNodeList[0]
57
+					if Compare(valPtr.val ,value) == 0{
58
+						op := operator[0]
59
+						cnt += findOperatorList(op, valPtr, sub, true)
60
+					}
61
+				} else { // value가 숫자일 때
62
+					leftop := operator[0]
63
+					logicalop := operator[1]
64
+					rightop := operator[2]
63 65
 
64
-				// (2) case : >=
65
-				for i := 0; i < len(valPtr.range2sub_eb); i++ {
66
-					sub := valPtr.range2sub_eb[i]
67
-					big = append(big, sub)
66
+					leftValuePtr := node.valNodeList[0]
67
+					rightValuePtr := node.valNodeList[1]
68
+
69
+					nodeValList := []int64{leftValuePtr.val[0], rightValuePtr.val[0]}
70
+
71
+					if Compare(nodeValList, value) == 0 {
72
+						if logicalop == "&&" {
73
+							cnt += findOperatorList(leftop, leftValuePtr, sub, false)
74
+							cnt += findOperatorList(rightop, rightValuePtr, sub, false)
75
+						} else {
76
+							cnt += findOperatorList(leftop, leftValuePtr, sub, true)
77
+							cnt += findOperatorList(rightop, rightValuePtr, sub, true)
78
+						}
79
+					}
68 80
 				}
81
+				if cnt == len(node.valNodeList){
82
+					canFind = true
83
+					break
84
+				}
85
+			}
86
+		}
87
+	}
88
+	return canFind
89
+}
69 90
 
70
-			} else if compare > 0 { // sub.val < pub.val
91
+// Returns if sub exists in the operator list in the same node as op.
92
+func findOperatorList(op string, node *valueNode, sub int, issingle bool) int {
93
+	ret := -1
94
+	if issingle == true {
95
+		switch op {
96
+		case "<":
97
+			ret = findSub(node.single2sub_s, sub)
98
+		case "<=":
99
+			ret = findSub(node.single2sub_es, sub)
100
+		case ">":
101
+			ret = findSub(node.single2sub_b, sub)
102
+		case ">=":
103
+			ret = findSub(node.single2sub_eb, sub)
104
+		case "==":
105
+			ret = findSub(node.single2sub_e, sub)
106
+		}
107
+	} else {
108
+		switch op {
109
+		case "<":
110
+			ret = findSub(node.range2sub_s, sub)
111
+		case "<=":
112
+			ret = findSub(node.range2sub_es, sub)
113
+		case ">":
114
+			ret = findSub(node.range2sub_b, sub)
115
+		case ">=":
116
+			ret = findSub(node.range2sub_eb, sub)
117
+		}
118
+	}
119
+	if ret < 0{
120
+		return 0
121
+	} else{
122
+		return 1
123
+	}
124
+}
71 125
 
72
-				// single : { <, <= }
126
+//	### To Insert sub#
127
+func (manager *sub_manager) addSubscription(msg MsgUnit) error {
73 128
 
74
-				// (1) case : <
75
-				for i := 0; i < len(valPtr.single2sub_s); i++ {
76
-					sub := valPtr.single2sub_s[i]
77
-					ip := sub_mng.sub2ip[sub]
78
-					ret = append(ret, ip)
79
-				}
129
+	topic := msg.(SubscriptionMsg).Topic
130
+	value := msg.(SubscriptionMsg).Value
131
+	operator := msg.(SubscriptionMsg).Operator
132
+	subnumber := 0
80 133
 
81
-				// (2) case : <=
82
-				for i := 0; i < len(valPtr.single2sub_es); i++ {
83
-					sub := valPtr.single2sub_es[i]
84
-					ip := sub_mng.sub2ip[sub]
85
-					ret = append(ret, ip)
86
-				}
134
+	// 0.  Check the same ip and sub information
135
+	if manager.isDuplicated(msg) == true {
136
+		return errors.New("Duplicater Subscription")
137
+	}
87 138
 
88
-				// range : { <, <= }
139
+	// 1. Mapping incoming IP address to sub #
140
+	if len(manager.emptylist) == 0 {
141
+		subnumber = manager.count_sub
142
+		manager.ip2sub[msg.(SubscriptionMsg).From] = append(manager.ip2sub[msg.(SubscriptionMsg).From], manager.count_sub)
143
+		manager.sub2ip[subnumber] = msg.(SubscriptionMsg).From
144
+		manager.count_sub++
145
+	} else {
146
+		subnumber := manager.emptylist[len(manager.emptylist)-1]
147
+		manager.emptylist = manager.emptylist[:len(manager.emptylist)-1]
148
+		manager.ip2sub[msg.(SubscriptionMsg).From] = append(manager.ip2sub[msg.(SubscriptionMsg).From], subnumber)
149
+		manager.sub2ip[subnumber] = msg.(SubscriptionMsg).From
150
+	}
89 151
 
90
-				// (1) case : <
91
-				for i := 0; i < len(valPtr.range2sub_s); i++ {
92
-					sub := valPtr.range2sub_s[i]
93
-					small = append(small, sub)
94
-				}
95
-				// (2) case : <=
96
-				for i := 0; i < len(valPtr.range2sub_es); i++ {
97
-					sub := valPtr.range2sub_es[i]
98
-					small = append(small, sub)
99
-				}
152
+	nameptr := manager.list.head
153
+	findOk := false
100 154
 
101
-			} else { // sub.val == pub.val
155
+	// * 2. Add Subscription
102 156
 
103
-				// single : { <=, >=, ==}
157
+	// Find name in namelist, add if not found
158
+	for nameptr != nil {
159
+		if Compare(nameptr.topic, topic) == 0 {
160
+			findOk = true
161
+			break
162
+		}
163
+		nameptr = nameptr.next
164
+	}
104 165
 
105
-				// (1) case : <=
106
-				for i := 0; i < len(valPtr.single2sub_es); i++ {
107
-					sub := valPtr.single2sub_es[i]
108
-					ip := sub_mng.sub2ip[sub]
109
-					ret = append(ret, ip)
110
-				}
111 166
 
112
-				// (2) case : >=
113
-				for i := 0; i < len(valPtr.single2sub_eb); i++ {
114
-					sub := valPtr.single2sub_eb[i]
115
-					ip := sub_mng.sub2ip[sub]
116
-					ret = append(ret, ip)
117
-				}
167
+	if !findOk {
168
+		manager.list.addTopicNode(topic)
169
+		nameptr = manager.list.tail
170
+	}
118 171
 
119
-				// (3) case : ==
120
-				for i := 0; i < len(valPtr.single2sub_e); i++ {
121
-					sub := valPtr.single2sub_e[i]
122
-					ip := sub_mng.sub2ip[sub]
123
-					ret = append(ret, ip)
124
-				}
172
+	var addValNodeList []*valueNode
125 173
 
126
-				// range : { <=, >= }
174
+	// Add Value to list[name]
175
+	if len(operator) == 1 { // if single expression
176
+		valptr := nameptr.list.getValueNodePos(value)
177
+		if valptr == nil {
178
+			nameptr.list.addValueNode(value)
179
+			valptr = nameptr.list.tail
180
+		}
181
+		addValNodeList = append(addValNodeList, valptr)
182
+		manager.sub2node[subnumber] = append(manager.sub2node[subnumber], nodeInfo{addValNodeList, topic})
183
+		valptr.insertSub(operator[0], subnumber, true)
184
+		return nil // AddSubscription ok
185
+	} else {
186
+		// {">" "&&" "<"}
187
+		// For compound expressions bounded by '&&' and '||'
188
+		// (ex) { (234 < x) && (x <= 1293) } , { (234 < x) || ( x < 1293) }
127 189
 
128
-				// (1) case : <=
129
-				for i := 0; i < len(valPtr.range2sub_es); i++ {
130
-					sub := valPtr.range2sub_es[i]
131
-					small = append(small, sub)
132
-				}
190
+		logical_operator := operator[2]
133 191
 
134
-				// (2) case : >=
135
-				for i := 0; i < len(valPtr.range2sub_eb); i++ {
136
-					sub := valPtr.range2sub_eb[i]
137
-					big = append(big, sub)
138
-				}
192
+		// Find ValueNode = (namelist[name].list.val == Value)
193
+		valptr1 := nameptr.list.getValueNodePos([]int64{value[0]})
194
+		valptr2 := nameptr.list.getValueNodePos([]int64{value[1]})
139 195
 
140
-			}
141
-			valPtr = valPtr.next
196
+		if valptr1 == nil {
197
+			nameptr.list.addValueNode([]int64{value[0]})
198
+			valptr1 = nameptr.list.tail
199
+		}
200
+		if valptr2 == nil {
201
+			nameptr.list.addValueNode([]int64{value[1]})
202
+			valptr2 = nameptr.list.tail
142 203
 		}
204
+		addValNodeList = append(addValNodeList, valptr1)
205
+		addValNodeList = append(addValNodeList, valptr2)
206
+		manager.sub2node[subnumber] = append(manager.sub2node[subnumber], nodeInfo{addValNodeList, topic})
207
+		manager.sub2node[subnumber] = append(manager.sub2node[subnumber], nodeInfo{addValNodeList, topic})
143 208
 
144
-		// Add the intersection IP address of two sets (large and small) to the return list
145
-		hash := intersect.Hash(small, big)
146
-		list := reflect.ValueOf(hash)
147
-		for i := 0; i < list.Len(); i++ {
148
-			sub := list.Index(i).Interface().(int)
149
-			ip := sub_mng.sub2ip[sub]
150
-			ret = append(ret, ip)
209
+		if logical_operator == "&&" {
210
+			// If they are enclosed in '&&' -> Insert Value to range_operator_list
211
+			manager.israngesub[subnumber] = true
212
+			valptr1.insertSub(operator[0], subnumber, false)
213
+			valptr2.insertSub(operator[2], subnumber, false)
214
+
215
+		} else {
216
+			// if they are enclosed in '||' -> Insert Value to single_operator_list
217
+			valptr1.insertSub(operator[0], subnumber, true)
218
+			valptr2.insertSub(operator[2], subnumber, true)
151 219
 		}
152 220
 
153
-		moscato.MatchingManager.match_count++
154
-		moscato.SendQueue <- myType{ret, msg, nil}
221
+		return nil // addSubscription ok
155 222
 	}
223
+	return errors.New("Can't addSubscription")
156 224
 }
225
+
226
+// To delete subscription
227
+func (manager *sub_manager) delete(from string) error {
228
+
229
+	ip := from
230
+	cand := manager.ip2sub[ip]
231
+
232
+	for i := 0; i < len(cand); i++ {
233
+
234
+		sub := cand[i]
235
+		for j := 0; j < len(manager.sub2node[sub]); j++ {
236
+			nodeinfo := manager.sub2node[sub][j]
237
+			node := nodeinfo.valNodeList
238
+			if manager.israngesub[sub] {
239
+				for k := 0; k < len(node); k++ {
240
+					pos := findSub(node[k].range2sub_s, sub)
241
+					if pos != -1 {
242
+						node[k].range2sub_s = remove(node[k].range2sub_s, pos)
243
+						manager.emptylist = append(manager.emptylist, sub)
244
+					}
245
+
246
+					pos = findSub(node[k].range2sub_es, sub)
247
+					if pos != -1 {
248
+						node[k].range2sub_es = remove(node[k].range2sub_es, pos)
249
+						manager.emptylist = append(manager.emptylist, sub)
250
+					}
251
+
252
+					pos = findSub(node[k].range2sub_b, sub)
253
+					if pos != -1 {
254
+						node[k].range2sub_b = remove(node[k].range2sub_b, pos)
255
+						manager.emptylist = append(manager.emptylist, sub)
256
+					}
257
+
258
+					pos = findSub(node[k].range2sub_eb, sub)
259
+					if pos != -1 {
260
+						node[k].range2sub_eb = remove(node[k].range2sub_eb, pos)
261
+						manager.emptylist = append(manager.emptylist, sub)
262
+					}
263
+
264
+					isempty := node[k].isEmpty()
265
+
266
+					// Delete if Value Node is empty
267
+					if isempty && node[k] != nil {
268
+						prev_node := node[k].prev
269
+						next_node := node[k].next
270
+
271
+						prev_node.next = node[k].next
272
+						next_node.prev = node[k].prev
273
+					}
274
+				}
275
+			} else {
276
+				for k := 0; k < len(node); k++ {
277
+					pos := findSub(node[k].single2sub_s, sub)
278
+					if pos != -1 {
279
+						node[k].single2sub_s = remove(node[k].single2sub_s, pos)
280
+						manager.emptylist = append(manager.emptylist, sub)
281
+					}
282
+
283
+					pos = findSub(node[k].single2sub_es, sub)
284
+					if pos != -1 {
285
+						node[k].single2sub_es = remove(node[k].single2sub_es, pos)
286
+						manager.emptylist = append(manager.emptylist, sub)
287
+					}
288
+
289
+					pos = findSub(node[k].single2sub_b, sub)
290
+					if pos != -1 {
291
+						node[k].single2sub_b = remove(node[k].single2sub_b, pos)
292
+						manager.emptylist = append(manager.emptylist, sub)
293
+					}
294
+
295
+					pos = findSub(node[k].single2sub_eb, sub)
296
+					if pos != -1 {
297
+						node[k].single2sub_eb = remove(node[k].single2sub_eb, pos)
298
+						manager.emptylist = append(manager.emptylist, sub)
299
+					}
300
+
301
+					pos = findSub(node[k].single2sub_e, sub)
302
+					if pos != -1 {
303
+						node[k].single2sub_e = remove(node[k].single2sub_e, pos)
304
+						manager.emptylist = append(manager.emptylist, sub)
305
+					}
306
+
307
+					isempty := node[k].isEmpty()
308
+
309
+					// Delete if Value Node is empty
310
+					if isempty && node[k] != nil {
311
+						prevNode := node[k].prev
312
+						nextNode := node[k].next
313
+
314
+						prevNode.next = node[k].next
315
+						nextNode.prev = node[k].prev
316
+					}
317
+				}
318
+			}
319
+		}
320
+		manager.ip2sub[ip] = nil // Delete sub#s mapped to Ip address
321
+		return nil
322
+	}
323
+	return errors.New("Don't Exist Subscription to delete")
324
+}

Loading…
Peruuta
Tallenna