瀏覽代碼

[fin] matching

master
extra1563 4 年之前
父節點
當前提交
f2c35eb324
共有 1 個檔案被更改,包括 139 行新增7 行删除
  1. 139
    7
      src/broker/modules/matching.go

+ 139
- 7
src/broker/modules/matching.go 查看文件

@@ -1,21 +1,153 @@
1 1
 package modules
2 2
 
3 3
 import (
4
+	"errors"
4 5
 	_ "errors"
5
-	_ "fmt"
6
+	"github.com/juliangruber/go-intersect"
6 7
 )
7 8
 
8 9
 type match_manager struct {
10
+	match_count	int
9 11
 }
10 12
 
11
-func (match_mng *match_manager) matching(queue *MsgQueue) error {
13
+// Returns a list of IP addresses of matched subs
14
+func (match_mng *match_manager) Matching(queue *MsgQueue, sub_mng *sub_manager) ([]string, MsgUnit, error) {
12 15
 	msg := queue.pop(true)
16
+	topic := msg.(*PublishMsg).topic
17
+	value := msg.(*PublishMsg).value
13 18
 
14
-	//Implement here ~~
15
-	subscription := msg.(*PublishMsg).subscription
16
-	content := msg.(*PublishMsg).content
19
+	// list
20
+	ret := make([]string, 0)
17 21
 
18
-	println(subscription, content)
22
+	// list for matched range subscriptions
23
+	big := make([]int, 0)
24
+	small := make([]int, 0)
19 25
 
20
-	return nil
26
+	// 1. Find (topicNode[topic] == msg.topic) Node
27
+	topicPtr := sub_mng.list
28
+	pos := topicPtr.getTopicNodePos(topic)
29
+
30
+	// Don't Exist topicNode
31
+	if pos == nil{
32
+		return nil,nil,errors.New("Don't Exist topicNode")
33
+	}
34
+
35
+	// 2. Traverse all valueNode -> and Match
36
+	valPtr := pos.list.head
37
+	for valPtr != nil{
38
+		compare := Compare(valPtr.val, value)
39
+		if compare < 0 { // sub.val > pub.val
40
+		// single : { >, >= }
41
+
42
+			// (1) case : >
43
+			for i := 0; i < len(valPtr.single2sub_b); i++{
44
+				sub := valPtr.single2sub_b[i]
45
+				ip := sub_mng.sub2ip[sub]
46
+				ret = append(ret, ip)
47
+			}
48
+
49
+			// (2) case : >=
50
+			for i := 0; i < len(valPtr.single2sub_eb); i++{
51
+				sub := valPtr.single2sub_eb[i]
52
+				ip := sub_mng.sub2ip[sub]
53
+				ret = append(ret, ip)
54
+			}
55
+		// range : { >, >= }
56
+
57
+			// (1) case : >
58
+			for i := 0; i < len(valPtr.range2sub_b); i++{
59
+				sub := valPtr.range2sub_b[i]
60
+				big = append(big, sub)
61
+			}
62
+
63
+			// (2) case : >=
64
+			for i := 0; i < len(valPtr.range2sub_eb); i++{
65
+				sub := valPtr.range2sub_eb[i]
66
+				big = append(big, sub)
67
+			}
68
+
69
+		} else if compare > 0 { // sub.val < pub.val
70
+
71
+		// single : { <, <= }
72
+
73
+			// (1) case : <
74
+			for i := 0; i < len(valPtr.single2sub_s); i++{
75
+				sub := valPtr.single2sub_s[i]
76
+				ip := sub_mng.sub2ip[sub]
77
+				ret = append(ret, ip)
78
+			}
79
+
80
+			// (2) case : <=
81
+			for i := 0; i < len(valPtr.single2sub_es); i++{
82
+				sub := valPtr.single2sub_es[i]
83
+				ip := sub_mng.sub2ip[sub]
84
+				ret = append(ret, ip)
85
+			}
86
+
87
+		// range : { <, <= }
88
+
89
+			// (1) case : <
90
+			for i := 0; i < len(valPtr.range2sub_s); i++{
91
+				sub := valPtr.range2sub_s[i]
92
+				small = append(small, sub)
93
+			}
94
+			// (2) case : <=
95
+			for i := 0; i < len(valPtr.range2sub_es); i++{
96
+				sub := valPtr.range2sub_es[i]
97
+				small = append(small, sub)
98
+			}
99
+
100
+		} else{ // sub.val == pub.val
101
+
102
+		// single : { <=, >=, ==}
103
+
104
+			// (1) case : <=
105
+			for i := 0; i < len(valPtr.single2sub_es); i++{
106
+				sub := valPtr.single2sub_es[i]
107
+				ip := sub_mng.sub2ip[sub]
108
+				ret = append(ret, ip)
109
+			}
110
+
111
+			// (2) case : >=
112
+			for i := 0; i < len(valPtr.single2sub_eb); i++{
113
+				sub := valPtr.single2sub_eb[i]
114
+				ip := sub_mng.sub2ip[sub]
115
+				ret = append(ret, ip)
116
+			}
117
+
118
+			// (3) case : ==
119
+			for i := 0; i < len(valPtr.single2sub_e); i++{
120
+				sub := valPtr.single2sub_e[i]
121
+				ip := sub_mng.sub2ip[sub]
122
+				ret = append(ret, ip)
123
+			}
124
+
125
+		// range : { <=, >= }
126
+
127
+			// (1) case : <=
128
+			for i := 0; i < len(valPtr.range2sub_es); i++{
129
+				sub := valPtr.range2sub_es[i]
130
+				small = append(small, sub)
131
+			}
132
+
133
+			// (2) case : >=
134
+			for i := 0; i < len(valPtr.range2sub_eb); i++{
135
+				sub := valPtr.range2sub_eb[i]
136
+				big = append(big, sub)
137
+			}
138
+
139
+		}
140
+	}
141
+
142
+	// Add the intersection IP address of two sets (large and small) to the return list
143
+	hash := intersect.Hash(small, big)
144
+	for i := 0; i < len(hash); i++{
145
+		sub := hash[i].(int)
146
+		ip := sub_mng.sub2ip[sub]
147
+		ret = append(ret, ip)
148
+	}
149
+
150
+	match_mng.match_count++
151
+
152
+	return ret, msg, nil
21 153
 }

Loading…
取消
儲存