Bladeren bron

[feat] Add method to check same ip and sub information

master
extra1563 4 jaren geleden
bovenliggende
commit
ae6ff9e669
1 gewijzigde bestanden met toevoegingen van 198 en 92 verwijderingen
  1. 198
    92
      src/broker/modules/subscription.go

+ 198
- 92
src/broker/modules/subscription.go Bestand weergeven

@@ -12,15 +12,20 @@ type sub_manager struct {
12 12
 	emptylist  []int                // To administrate sub #
13 13
 	ip2sub     map[string][]int     // ip2sub[ip] = sub# ...
14 14
 	sub2ip     map[int]string       // sub2ip[sub#] = ip
15
-	sub2node   map[int][]*valueNode // sub2node[sub#] = node_addr ...
15
+	sub2node   map[int][]nodeInfo // sub2node[sub#] = node_addr ...
16 16
 	israngesub map[int]bool         // To manage when deleted
17 17
 }
18 18
 
19
+type nodeInfo struct{
20
+	valNodeList []*valueNode
21
+	topic []int64
22
+}
23
+
19 24
 func (manager *sub_manager) Initialize() {
20 25
 	// Some initialize
21 26
 	manager.ip2sub = make(map[string][]int)
22 27
 	manager.sub2ip = make(map[int]string)
23
-	manager.sub2node = make(map[int][]*valueNode)
28
+	manager.sub2node = make(map[int][]nodeInfo)
24 29
 	manager.israngesub = make(map[int]bool)
25 30
 }
26 31
 
@@ -30,6 +35,94 @@ func newSubmng() *sub_manager {
30 35
 	return subMng
31 36
 }
32 37
 
38
+func (manager *sub_manager) isDuplicated(msg MsgUnit) bool{
39
+	from := msg.(SubscriptionMsg).From
40
+	topic := msg.(SubscriptionMsg).Topic
41
+	value := msg.(SubscriptionMsg).Value
42
+	operator := msg.(SubscriptionMsg).Operator
43
+	subList := manager.ip2sub[from]
44
+	canFind := false
45
+
46
+	for i := 0; i < len(subList) && canFind == false ; i++{
47
+		sub := subList[i]
48
+		nodeinfoList := manager.sub2node[sub]
49
+
50
+		for j := 0; j < len(nodeinfoList); j++{
51
+			node := nodeinfoList[j]
52
+
53
+			if Compare(node.topic, topic) == 0 {
54
+				cnt := 0
55
+				if len(operator) == 1{
56
+					valPtr := node.valNodeList[0]
57
+					if Compare(valPtr.val ,value) == 0{
58
+						op := operator[0]
59
+						cnt += findOperatorList(op, valPtr, sub, true)
60
+					}
61
+				} else { // value가 숫자일 때
62
+					leftop := operator[0]
63
+					logicalop := operator[1]
64
+					rightop := operator[2]
65
+
66
+					leftValuePtr := node.valNodeList[0]
67
+					rightValuePtr := node.valNodeList[1]
68
+
69
+					nodeValList := []int64{leftValuePtr.val[0], rightValuePtr.val[0]}
70
+
71
+					if Compare(nodeValList, value) == 0 {
72
+						if logicalop == "&&" {
73
+							cnt += findOperatorList(leftop, leftValuePtr, sub, false)
74
+							cnt += findOperatorList(rightop, rightValuePtr, sub, false)
75
+						} else {
76
+							cnt += findOperatorList(leftop, leftValuePtr, sub, true)
77
+							cnt += findOperatorList(rightop, rightValuePtr, sub, true)
78
+						}
79
+					}
80
+				}
81
+				if cnt == len(node.valNodeList){
82
+					canFind = true
83
+					break
84
+				}
85
+			}
86
+		}
87
+	}
88
+	return canFind
89
+}
90
+
91
+// op와 같은 node내의 operator리스트에서 sub가 있다면 리턴
92
+func findOperatorList(op string, node *valueNode, sub int, issingle bool) int {
93
+	ret := -1
94
+	if issingle == true {
95
+		switch op {
96
+		case "<":
97
+			ret = findSub(node.single2sub_s, sub)
98
+		case "<=":
99
+			ret = findSub(node.single2sub_es, sub)
100
+		case ">":
101
+			ret = findSub(node.single2sub_b, sub)
102
+		case ">=":
103
+			ret = findSub(node.single2sub_eb, sub)
104
+		case "==":
105
+			ret = findSub(node.single2sub_e, sub)
106
+		}
107
+	} else {
108
+		switch op {
109
+		case "<":
110
+			ret = findSub(node.range2sub_s, sub)
111
+		case "<=":
112
+			ret = findSub(node.range2sub_es, sub)
113
+		case ">":
114
+			ret = findSub(node.range2sub_b, sub)
115
+		case ">=":
116
+			ret = findSub(node.range2sub_eb, sub)
117
+		}
118
+	}
119
+	if ret < 0{
120
+		return 0
121
+	} else{
122
+		return 1
123
+	}
124
+}
125
+
33 126
 //	### To Insert sub#
34 127
 func (manager *sub_manager) addSubscription(msg MsgUnit) error {
35 128
 
@@ -37,8 +130,13 @@ func (manager *sub_manager) addSubscription(msg MsgUnit) error {
37 130
 	value := msg.(SubscriptionMsg).Value
38 131
 	operator := msg.(SubscriptionMsg).Operator
39 132
 	subnumber := 0
40
-	// * 1. Mapping incoming IP address to sub #
41 133
 
134
+	// 0. Check if same IP & same <Topic, Value> exists
135
+	if manager.isDuplicated(msg) == true {
136
+		return errors.New("Duplicater Subscription")
137
+	}
138
+
139
+	// 1. Mapping incoming IP address to sub #
42 140
 	if len(manager.emptylist) == 0 {
43 141
 		subnumber = manager.count_sub
44 142
 		manager.ip2sub[msg.(SubscriptionMsg).From] = append(manager.ip2sub[msg.(SubscriptionMsg).From], manager.count_sub)
@@ -65,11 +163,14 @@ func (manager *sub_manager) addSubscription(msg MsgUnit) error {
65 163
 		nameptr = nameptr.next
66 164
 	}
67 165
 
166
+
68 167
 	if !findOk {
69 168
 		manager.list.addTopicNode(topic)
70 169
 		nameptr = manager.list.tail
71 170
 	}
72 171
 
172
+	var addValNodeList []*valueNode
173
+
73 174
 	// Add Value to list[name]
74 175
 	if len(operator) == 1 { // if single expression
75 176
 		valptr := nameptr.list.getValueNodePos(value)
@@ -77,15 +178,16 @@ func (manager *sub_manager) addSubscription(msg MsgUnit) error {
77 178
 			nameptr.list.addValueNode(value)
78 179
 			valptr = nameptr.list.tail
79 180
 		}
80
-
81
-		manager.sub2node[subnumber] = append(manager.sub2node[subnumber], valptr)
181
+		addValNodeList = append(addValNodeList, valptr)
182
+		manager.sub2node[subnumber] = append(manager.sub2node[subnumber], nodeInfo{addValNodeList, topic})
82 183
 		valptr.insertSub(operator[0], subnumber, true)
83 184
 		return nil // AddSubscription ok
84 185
 	} else {
85
-
186
+		// {">" "&&" "<"}
86 187
 		// For compound expressions bounded by '&&' and '||'
87 188
 		// (ex) { (234 < x) && (x <= 1293) } , { (234 < x) || ( x < 1293) }
88
-		logical_operator := operator[1]
189
+
190
+		logical_operator := operator[2]
89 191
 
90 192
 		// Find ValueNode = (namelist[name].list.val == Value)
91 193
 		valptr1 := nameptr.list.getValueNodePos([]int64{value[0]})
@@ -99,9 +201,10 @@ func (manager *sub_manager) addSubscription(msg MsgUnit) error {
99 201
 			nameptr.list.addValueNode([]int64{value[1]})
100 202
 			valptr2 = nameptr.list.tail
101 203
 		}
102
-
103
-		manager.sub2node[subnumber] = append(manager.sub2node[subnumber], valptr1)
104
-		manager.sub2node[subnumber] = append(manager.sub2node[subnumber], valptr2)
204
+		addValNodeList = append(addValNodeList, valptr1)
205
+		addValNodeList = append(addValNodeList, valptr2)
206
+		manager.sub2node[subnumber] = append(manager.sub2node[subnumber], nodeInfo{addValNodeList, topic})
207
+		manager.sub2node[subnumber] = append(manager.sub2node[subnumber], nodeInfo{addValNodeList, topic})
105 208
 
106 209
 		if logical_operator == "&&" {
107 210
 			// If they are enclosed in '&&' -> Insert Value to range_operator_list
@@ -129,90 +232,93 @@ func (manager *sub_manager) delete(from string) error {
129 232
 	for i := 0; i < len(cand); i++ {
130 233
 
131 234
 		sub := cand[i]
132
-		node := manager.sub2node[sub]
133
-
134
-		if manager.israngesub[sub] {
135
-			for j := 0; j < len(node); j++ {
136
-				pos := findSub(node[j].range2sub_s, sub)
137
-				if pos != -1 {
138
-					node[j].range2sub_s = remove(node[j].range2sub_s, pos)
139
-					manager.emptylist = append(manager.emptylist, sub)
140
-				}
141
-
142
-				pos = findSub(node[j].range2sub_es, sub)
143
-				if pos != -1 {
144
-					node[j].range2sub_es = remove(node[j].range2sub_es, pos)
145
-					manager.emptylist = append(manager.emptylist, sub)
146
-				}
147
-
148
-				pos = findSub(node[j].range2sub_b, sub)
149
-				if pos != -1 {
150
-					node[j].range2sub_b = remove(node[j].range2sub_b, pos)
151
-					manager.emptylist = append(manager.emptylist, sub)
152
-				}
153
-
154
-				pos = findSub(node[j].range2sub_eb, sub)
155
-				if pos != -1 {
156
-					node[j].range2sub_eb = remove(node[j].range2sub_eb, pos)
157
-					manager.emptylist = append(manager.emptylist, sub)
158
-				}
159
-
160
-				isempty := node[j].isEmpty()
161
-
162
-				// Delete if Value Node is empty
163
-				if isempty && node[j] != nil {
164
-					prev_node := node[j].prev
165
-					next_node := node[j].next
166
-
167
-					prev_node.next = node[j].next
168
-					next_node.prev = node[j].prev
169
-				}
170
-			}
171
-		} else {
172
-			for j := 0; j < len(node); j++ {
173
-				pos := findSub(node[j].single2sub_s, sub)
174
-				if pos != -1 {
175
-					node[j].single2sub_s = remove(node[j].single2sub_s, pos)
176
-					manager.emptylist = append(manager.emptylist, sub)
177
-				}
178
-
179
-				pos = findSub(node[j].single2sub_es, sub)
180
-				if pos != -1 {
181
-					node[j].single2sub_es = remove(node[j].single2sub_es, pos)
182
-					manager.emptylist = append(manager.emptylist, sub)
235
+		for j := 0; j < len(manager.sub2node[sub]); j++ {
236
+			nodeinfo := manager.sub2node[sub][j]
237
+			node := nodeinfo.valNodeList
238
+			if manager.israngesub[sub] {
239
+				for k := 0; k < len(node); k++ {
240
+					pos := findSub(node[k].range2sub_s, sub)
241
+					if pos != -1 {
242
+						node[k].range2sub_s = remove(node[k].range2sub_s, pos)
243
+						manager.emptylist = append(manager.emptylist, sub)
244
+					}
245
+
246
+					pos = findSub(node[k].range2sub_es, sub)
247
+					if pos != -1 {
248
+						node[k].range2sub_es = remove(node[k].range2sub_es, pos)
249
+						manager.emptylist = append(manager.emptylist, sub)
250
+					}
251
+
252
+					pos = findSub(node[k].range2sub_b, sub)
253
+					if pos != -1 {
254
+						node[k].range2sub_b = remove(node[k].range2sub_b, pos)
255
+						manager.emptylist = append(manager.emptylist, sub)
256
+					}
257
+
258
+					pos = findSub(node[k].range2sub_eb, sub)
259
+					if pos != -1 {
260
+						node[k].range2sub_eb = remove(node[k].range2sub_eb, pos)
261
+						manager.emptylist = append(manager.emptylist, sub)
262
+					}
263
+
264
+					isempty := node[k].isEmpty()
265
+
266
+					// Delete if Value Node is empty
267
+					if isempty && node[k] != nil {
268
+						prev_node := node[k].prev
269
+						next_node := node[k].next
270
+
271
+						prev_node.next = node[k].next
272
+						next_node.prev = node[k].prev
273
+					}
183 274
 				}
184
-
185
-				pos = findSub(node[j].single2sub_b, sub)
186
-				if pos != -1 {
187
-					node[j].single2sub_b = remove(node[j].single2sub_b, pos)
188
-					manager.emptylist = append(manager.emptylist, sub)
189
-				}
190
-
191
-				pos = findSub(node[j].single2sub_eb, sub)
192
-				if pos != -1 {
193
-					node[j].single2sub_eb = remove(node[j].single2sub_eb, pos)
194
-					manager.emptylist = append(manager.emptylist, sub)
195
-				}
196
-
197
-				pos = findSub(node[j].single2sub_e, sub)
198
-				if pos != -1 {
199
-					node[j].single2sub_e = remove(node[j].single2sub_e, pos)
200
-					manager.emptylist = append(manager.emptylist, sub)
201
-				}
202
-
203
-				isempty := node[j].isEmpty()
204
-
205
-				// Delete if Value Node is empty
206
-				if isempty && node[j] != nil {
207
-					prevNode := node[j].prev
208
-					nextNode := node[j].next
209
-
210
-					prevNode.next = node[j].next
211
-					nextNode.prev = node[j].prev
275
+			} else {
276
+				for k := 0; k < len(node); k++ {
277
+					pos := findSub(node[k].single2sub_s, sub)
278
+					if pos != -1 {
279
+						node[k].single2sub_s = remove(node[k].single2sub_s, pos)
280
+						manager.emptylist = append(manager.emptylist, sub)
281
+					}
282
+
283
+					pos = findSub(node[k].single2sub_es, sub)
284
+					if pos != -1 {
285
+						node[k].single2sub_es = remove(node[k].single2sub_es, pos)
286
+						manager.emptylist = append(manager.emptylist, sub)
287
+					}
288
+
289
+					pos = findSub(node[k].single2sub_b, sub)
290
+					if pos != -1 {
291
+						node[k].single2sub_b = remove(node[k].single2sub_b, pos)
292
+						manager.emptylist = append(manager.emptylist, sub)
293
+					}
294
+
295
+					pos = findSub(node[k].single2sub_eb, sub)
296
+					if pos != -1 {
297
+						node[k].single2sub_eb = remove(node[k].single2sub_eb, pos)
298
+						manager.emptylist = append(manager.emptylist, sub)
299
+					}
300
+
301
+					pos = findSub(node[k].single2sub_e, sub)
302
+					if pos != -1 {
303
+						node[k].single2sub_e = remove(node[k].single2sub_e, pos)
304
+						manager.emptylist = append(manager.emptylist, sub)
305
+					}
306
+
307
+					isempty := node[k].isEmpty()
308
+
309
+					// Delete if Value Node is empty
310
+					if isempty && node[k] != nil {
311
+						prevNode := node[k].prev
312
+						nextNode := node[k].next
313
+
314
+						prevNode.next = node[k].next
315
+						nextNode.prev = node[k].prev
316
+					}
212 317
 				}
213 318
 			}
214 319
 		}
320
+		manager.ip2sub[ip] = nil // Delete sub#s mapped to Ip address
321
+		return nil
215 322
 	}
216
-	manager.ip2sub[ip] = nil // Delete sub#s mapped to Ip address
217
-	return nil
218
-}
323
+	return errors.New("Don't Exist Subscription to delete")
324
+}

Laden…
Annuleren
Opslaan