Kaynağa Gözat

[revise] RPC가 가능하도록 init.go 와 message 객체 수정

master
kidjung 4 yıl önce
ebeveyn
işleme
f4ad37e461

+ 5
- 0
.idea/vcs.xml Dosyayı Görüntüle

2
 <project version="4">
2
 <project version="4">
3
   <component name="VcsDirectoryMappings">
3
   <component name="VcsDirectoryMappings">
4
     <mapping directory="$PROJECT_DIR$" vcs="Git" />
4
     <mapping directory="$PROJECT_DIR$" vcs="Git" />
5
+    <mapping directory="$PROJECT_DIR$/src/github.com/bmizerany/assert" vcs="Git" />
5
     <mapping directory="$PROJECT_DIR$/src/github.com/davecgh/go-spew" vcs="Git" />
6
     <mapping directory="$PROJECT_DIR$/src/github.com/davecgh/go-spew" vcs="Git" />
7
+    <mapping directory="$PROJECT_DIR$/src/github.com/juliangruber/go-intersect" vcs="Git" />
8
+    <mapping directory="$PROJECT_DIR$/src/github.com/kr/pretty" vcs="Git" />
9
+    <mapping directory="$PROJECT_DIR$/src/github.com/kr/text" vcs="Git" />
6
     <mapping directory="$PROJECT_DIR$/src/github.com/pmezard/go-difflib" vcs="Git" />
10
     <mapping directory="$PROJECT_DIR$/src/github.com/pmezard/go-difflib" vcs="Git" />
11
+    <mapping directory="$PROJECT_DIR$/src/github.com/rogpeppe/go-internal" vcs="Git" />
7
     <mapping directory="$PROJECT_DIR$/src/github.com/stretchr/testify" vcs="Git" />
12
     <mapping directory="$PROJECT_DIR$/src/github.com/stretchr/testify" vcs="Git" />
8
     <mapping directory="$PROJECT_DIR$/src/gopkg.in/yaml.v3" vcs="Git" />
13
     <mapping directory="$PROJECT_DIR$/src/gopkg.in/yaml.v3" vcs="Git" />
9
   </component>
14
   </component>

+ 51
- 21
src/broker/modules/init.go Dosyayı Görüntüle

1
 package modules
1
 package modules
2
 
2
 
3
 import (
3
 import (
4
+	"encoding/json"
4
 	"errors"
5
 	"errors"
5
 	"fmt"
6
 	"fmt"
6
 	"log"
7
 	"log"
9
 )
10
 )
10
 
11
 
11
 //temporary type for matching manager
12
 //temporary type for matching manager
12
-type match_manager struct{}
13
+//type match_manager struct{}
13
 
14
 
14
 func (match_mng *match_manager) matching(queue *MsgQueue) {
15
 func (match_mng *match_manager) matching(queue *MsgQueue) {
15
 	//msg := queue.pop(true)
16
 	//msg := queue.pop(true)
31
 }
32
 }
32
 
33
 
33
 type Reply struct {
34
 type Reply struct {
34
-	ReceiveMsg MsgUnit
35
+	CompleteLog string
35
 }
36
 }
36
 
37
 
37
-type Listner int
38
-
38
+type Receiver struct {
39
+	moscato *Moscato
40
+}
39
 
41
 
40
 //MS에서 보내는거-rpc call 사용
42
 //MS에서 보내는거-rpc call 사용
41
 /*func (l *Listner)MM_Receive(msg MsgUnit, reply *Reply)error{
43
 /*func (l *Listner)MM_Receive(msg MsgUnit, reply *Reply)error{
44
 	return nil
46
 	return nil
45
 }*/
47
 }*/
46
 
48
 
47
-func (moscato *Moscato)MM_Receive(msg MsgUnit, reply *Reply)error{
48
-	reply.ReceiveMsg=msg
49
-  	go moscato.Receive(reply)
49
+type Args struct { // 매개변수
50
+	JsonMsg []byte
51
+	Kind    int
52
+}
53
+
54
+func (receiver Receiver) MM_Receive(args Args, reply *Reply) error {
55
+	// 메세지 별로 나눠서 언마샬하면 됨
56
+
57
+	var msg PublishMsg
58
+
59
+	//msg
60
+	err := json.Unmarshal(args.JsonMsg, &msg)
61
+	if err != nil {
62
+		return err
63
+	}
64
+	go func() {
65
+		_, err := receiver.moscato.Receive(msg)
66
+		if err != nil {
67
+
68
+		}
69
+	}()
70
+	reply.CompleteLog = "received completely"
50
 	return nil
71
 	return nil
51
 }
72
 }
52
 
73
 
53
 //Recieve - MM가 msg전달 받음
74
 //Recieve - MM가 msg전달 받음
54
-func (moscato *Moscato)Receive(reply *Reply) (MsgUnit, error) {
75
+func (moscato *Moscato) Receive(msg MsgUnit) (MsgUnit, error) {
55
 	//rpc call
76
 	//rpc call
56
-	var msg_type = reply.ReceiveMsg.CheckType()
77
+	var msg_type = msg.CheckType()
57
 	//메세지 타입에 따라 다르게 처리
78
 	//메세지 타입에 따라 다르게 처리
58
 	switch msg_type {
79
 	switch msg_type {
59
 
80
 
60
 	case KSM: //Key share msg
81
 	case KSM: //Key share msg
61
 
82
 
62
 	case PM: //Publish msg
83
 	case PM: //Publish msg
63
-		moscato.queue.push(reply.ReceiveMsg.(*PublishMsg))
84
+		moscato.queue.push(msg.(PublishMsg))
85
+		log.Println("popped queue: ", moscato.queue.pop(false))
64
 
86
 
65
 	case SM: //Subscription msg
87
 	case SM: //Subscription msg
66
-		moscato.match_mng.add_subscription(reply.ReceiveMsg.(*SubscriptionMsg))
88
+		moscato.match_mng.add_subscription(msg.(*SubscriptionMsg))
67
 
89
 
68
 	case RM: //Register msg
90
 	case RM: //Register msg
69
 		//var newmsg RegisterMsg
91
 		//var newmsg RegisterMsg
70
-		var newmsg = reply.ReceiveMsg.(*RegisterMsg)
71
-		newNode := MSnode{newmsg.From(), newmsg.From()}
92
+		var newmsg = msg.(*RegisterMsg)
93
+		newNode := MSnode{newmsg.From, newmsg.From}
72
 		moscato.ms_mng.add_microservice(newNode)
94
 		moscato.ms_mng.add_microservice(newNode)
73
 
95
 
74
 	case WM: //Withdraw msg
96
 	case WM: //Withdraw msg
75
-		moscato.ms_mng.remove_microservice(reply.ReceiveMsg.(*WithdrawMsg).From())
97
+		moscato.ms_mng.remove_microservice(msg.(*WithdrawMsg).From)
76
 
98
 
77
 	default:
99
 	default:
78
 		return nil, errors.New("Message type Error: Not registered message type")
100
 		return nil, errors.New("Message type Error: Not registered message type")
79
 	}
101
 	}
80
 
102
 
81
-	return reply.ReceiveMsg, nil
103
+	return msg, nil
82
 }
104
 }
83
 
105
 
84
-
85
 func (moscato *Moscato) Run() {
106
 func (moscato *Moscato) Run() {
86
 	//모스카토 구조체 변수 초기화
107
 	//모스카토 구조체 변수 초기화
108
+	receiver := Receiver{moscato: moscato}
87
 	moscato.queue.queue_init()
109
 	moscato.queue.queue_init()
88
 
110
 
89
 	//go routine -> matching 동작
111
 	//go routine -> matching 동작
90
 	go moscato.match_mng.matching(&moscato.queue)
112
 	go moscato.match_mng.matching(&moscato.queue)
91
 
113
 
92
-	//rpc 등록 -> Receive함수
93
-	rpc.Register(moscato)
94
-	moscato.Listen()
114
+	//rpc 등록 -> Receive 함수
115
+	err := rpc.Register(receiver)
116
+	if err != nil {
117
+		fmt.Println("fuck")
118
+		log.Println(err)
119
+		return
120
+	}
121
+	Listen()
122
+	log.Println("listen complete.")
123
+	fmt.Scanln()
95
 }
124
 }
96
 
125
 
97
-func (moscato *Moscato) Listen() {
98
-	l, err := net.Listen("tcp", fmt.Sprintf(":%v", 8160))
126
+func Listen() {
127
+	//l, err := net.Listen("tcp", fmt.Sprintf(":%v", 8160))
128
+	l, err := net.Listen("tcp", "0.0.0.0:8160")
99
 
129
 
100
 	if err != nil {
130
 	if err != nil {
101
 		log.Fatal(fmt.Sprintf("Unable to listen on given port: %s", err))
131
 		log.Fatal(fmt.Sprintf("Unable to listen on given port: %s", err))

+ 19
- 19
src/broker/modules/list.go Dosyayı Görüntüle

22
 }
22
 }
23
 
23
 
24
 type valueNode struct {
24
 type valueNode struct {
25
-	val  []int64 // Encrypted topic
25
+	val  []int64 // Encrypted Topic
26
 	next *valueNode
26
 	next *valueNode
27
 	prev *valueNode
27
 	prev *valueNode
28
 
28
 
45
 	return append(ary[:i], ary[i+1:]...)
45
 	return append(ary[:i], ary[i+1:]...)
46
 }
46
 }
47
 
47
 
48
-// To find a specific sub# in a value node
48
+// To find a specific sub# in a Value node
49
 func findSub(ary []int, sub int) int {
49
 func findSub(ary []int, sub int) int {
50
 	for i := 0; i < len(ary); i++ {
50
 	for i := 0; i < len(ary); i++ {
51
 		if ary[i] == sub {
51
 		if ary[i] == sub {
55
 	return -1
55
 	return -1
56
 }
56
 }
57
 
57
 
58
-// To add a topic node to the topic list
59
-func (l *topicList) addTopicNode(topic []int64){
58
+// To add a Topic node to the Topic list
59
+func (l *topicList) addTopicNode(topic []int64) {
60
 	newNode := &topicNode{topic, nil, nil, valueList{}}
60
 	newNode := &topicNode{topic, nil, nil, valueList{}}
61
-	if l.head == nil{
61
+	if l.head == nil {
62
 		l.head = newNode
62
 		l.head = newNode
63
 		l.tail = l.head
63
 		l.tail = l.head
64
-	} else{
64
+	} else {
65
 		newNode.prev = l.tail
65
 		newNode.prev = l.tail
66
 		l.tail.next = newNode
66
 		l.tail.next = newNode
67
 		l.tail = newNode
67
 		l.tail = newNode
69
 	l.size++
69
 	l.size++
70
 }
70
 }
71
 
71
 
72
-// To check if a specific value node is empty
72
+// To check if a specific Value node is empty
73
 func (n *valueNode) isEmpty() bool {
73
 func (n *valueNode) isEmpty() bool {
74
 	empty := true
74
 	empty := true
75
 	if len(n.single2sub_s) != 0 {
75
 	if len(n.single2sub_s) != 0 {
102
 	return empty
102
 	return empty
103
 }
103
 }
104
 
104
 
105
-// To insert sub# into the operator list of the value node
105
+// To insert sub# into the operator list of the Value node
106
 func (n *valueNode) insertSub(op string, sub int, issingle bool) {
106
 func (n *valueNode) insertSub(op string, sub int, issingle bool) {
107
 	if issingle == true {
107
 	if issingle == true {
108
 		switch op {
108
 		switch op {
131
 	}
131
 	}
132
 }
132
 }
133
 
133
 
134
-// To add a value node to the value list
134
+// To add a Value node to the Value list
135
 func (l *valueList) addValueNode(value []int64) {
135
 func (l *valueList) addValueNode(value []int64) {
136
 	newValNode := &valueNode{value, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil}
136
 	newValNode := &valueNode{value, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil}
137
-	if l.head == nil{
137
+	if l.head == nil {
138
 		l.head = newValNode
138
 		l.head = newValNode
139
 		l.tail = l.head
139
 		l.tail = l.head
140
-	} else{
140
+	} else {
141
 		newValNode.prev = l.tail
141
 		newValNode.prev = l.tail
142
 		l.tail.next = newValNode
142
 		l.tail.next = newValNode
143
 		l.tail = newValNode
143
 		l.tail = newValNode
145
 	l.size++
145
 	l.size++
146
 }
146
 }
147
 
147
 
148
-// To get the position of a topic node with a specific topic
148
+// To get the position of a Topic node with a specific Topic
149
 func (l *topicList) getTopicNodePos(topic []int64) *topicNode {
149
 func (l *topicList) getTopicNodePos(topic []int64) *topicNode {
150
 	topicPtr := l.head
150
 	topicPtr := l.head
151
 	for topicPtr != nil {
151
 	for topicPtr != nil {
152
-		if len(topicPtr.topic) == 0{
152
+		if len(topicPtr.topic) == 0 {
153
 			topicPtr = topicPtr.next
153
 			topicPtr = topicPtr.next
154
 			continue
154
 			continue
155
 		}
155
 		}
161
 	return nil
161
 	return nil
162
 }
162
 }
163
 
163
 
164
-// To get the position of a value node with a specific value
164
+// To get the position of a Value node with a specific Value
165
 func (l *valueList) getValueNodePos(value []int64) *valueNode {
165
 func (l *valueList) getValueNodePos(value []int64) *valueNode {
166
 	valPtr := l.head
166
 	valPtr := l.head
167
 	for valPtr != nil {
167
 	for valPtr != nil {
168
-		if len(valPtr.val) == 0{
168
+		if len(valPtr.val) == 0 {
169
 			valPtr = valPtr.next
169
 			valPtr = valPtr.next
170
 			continue
170
 			continue
171
 		}
171
 		}
181
 func Compare(a []int64, b []int64) int {
181
 func Compare(a []int64, b []int64) int {
182
 	if len(a) < len(b) {
182
 	if len(a) < len(b) {
183
 		return 1
183
 		return 1
184
-	} else if len(a) > len(b){
184
+	} else if len(a) > len(b) {
185
 		return -1
185
 		return -1
186
-	} else{
187
-		for i := 0; i < len(a); i++{
186
+	} else {
187
+		for i := 0; i < len(a); i++ {
188
 			if a[i] < b[i] {
188
 			if a[i] < b[i] {
189
 				return 1
189
 				return 1
190
-			} else if a[i] > b[i]{
190
+			} else if a[i] > b[i] {
191
 				return -1
191
 				return -1
192
 			}
192
 			}
193
 		}
193
 		}

+ 29
- 29
src/broker/modules/list_test.go Dosyayı Görüntüle

3
 import (
3
 import (
4
 	"fmt"
4
 	"fmt"
5
 	_ "fmt"
5
 	_ "fmt"
6
-	"testing"
7
 	"github.com/stretchr/testify/assert"
6
 	"github.com/stretchr/testify/assert"
7
+	"testing"
8
 )
8
 )
9
 
9
 
10
 func Test_addTopicNode(t *testing.T) {
10
 func Test_addTopicNode(t *testing.T) {
11
 
11
 
12
 	list := &topicList{nil, nil, 0}
12
 	list := &topicList{nil, nil, 0}
13
-	ary := []int64{123,456,789,1234}
13
+	ary := []int64{123, 456, 789, 1234}
14
 
14
 
15
-	for i:=0; i< 4; i++{
15
+	for i := 0; i < 4; i++ {
16
 		tmp := []int64{ary[i]}
16
 		tmp := []int64{ary[i]}
17
 		list.addTopicNode(tmp)
17
 		list.addTopicNode(tmp)
18
 		exp := []int64{ary[i]}
18
 		exp := []int64{ary[i]}
24
 func Test_addValueNode(t *testing.T) {
24
 func Test_addValueNode(t *testing.T) {
25
 
25
 
26
 	list := &valueList{nil, nil, 0}
26
 	list := &valueList{nil, nil, 0}
27
-	ary := []int64{123,456,789,1234}
27
+	ary := []int64{123, 456, 789, 1234}
28
 
28
 
29
-	for i:=0; i< 4; i++{
29
+	for i := 0; i < 4; i++ {
30
 		tmp := []int64{ary[i]}
30
 		tmp := []int64{ary[i]}
31
 		list.addValueNode(tmp)
31
 		list.addValueNode(tmp)
32
 		exp := []int64{ary[i]}
32
 		exp := []int64{ary[i]}
35
 	}
35
 	}
36
 }
36
 }
37
 
37
 
38
-func Test_findSub(t *testing.T){
38
+func Test_findSub(t *testing.T) {
39
 
39
 
40
-	ary := []int{1,2,3,4,5,6}
40
+	ary := []int{1, 2, 3, 4, 5, 6}
41
 	for i := 0; i < 6; i++ {
41
 	for i := 0; i < 6; i++ {
42
-		assert.Equal(t, findSub(ary, i + 1), i, "findSub is failed")
42
+		assert.Equal(t, findSub(ary, i+1), i, "findSub is failed")
43
 	}
43
 	}
44
 
44
 
45
 	// Don't Exist
45
 	// Don't Exist
50
 	assert.Equal(t, findSub(ary, 6), 4)
50
 	assert.Equal(t, findSub(ary, 6), 4)
51
 }
51
 }
52
 
52
 
53
-func Test_remove(t *testing.T){
53
+func Test_remove(t *testing.T) {
54
 	l := make([]int, 4)
54
 	l := make([]int, 4)
55
 	var i int
55
 	var i int
56
-	for i = 0; i < 4; i++{
56
+	for i = 0; i < 4; i++ {
57
 		l[i] = i
57
 		l[i] = i
58
 	}
58
 	}
59
 
59
 
60
 	// array = {0, 1, 2, 3} -> {0, 1, 3}
60
 	// array = {0, 1, 2, 3} -> {0, 1, 3}
61
 	l = remove(l, 2)
61
 	l = remove(l, 2)
62
-	assert.Equal(t, l,[]int{0, 1, 3}, "Array Delete is failed")
62
+	assert.Equal(t, l, []int{0, 1, 3}, "Array Delete is failed")
63
 
63
 
64
 	// array = {0, 1, 3} -> {1, 3}
64
 	// array = {0, 1, 3} -> {1, 3}
65
 	l = remove(l, 0)
65
 	l = remove(l, 0)
74
 	assert.Equal(t, l, []int{}, "Array Delete is failed")
74
 	assert.Equal(t, l, []int{}, "Array Delete is failed")
75
 }
75
 }
76
 
76
 
77
-func Test_isempty(t *testing.T){
78
-	node := &valueNode{next : nil, prev : nil}
77
+func Test_isempty(t *testing.T) {
78
+	node := &valueNode{next: nil, prev: nil}
79
 
79
 
80
 	// 1. One element in Node
80
 	// 1. One element in Node
81
 	node.single2sub_eb = append(node.single2sub_eb, 1)
81
 	node.single2sub_eb = append(node.single2sub_eb, 1)
91
 	node.single2sub_s = append(node.single2sub_s, 1)
91
 	node.single2sub_s = append(node.single2sub_s, 1)
92
 	node.single2sub_es = append(node.single2sub_es, 1)
92
 	node.single2sub_es = append(node.single2sub_es, 1)
93
 	node.single2sub_e = append(node.single2sub_e, 1)
93
 	node.single2sub_e = append(node.single2sub_e, 1)
94
-	assert.Equal(t, node.isEmpty(), false,"isEmpty is failed")
94
+	assert.Equal(t, node.isEmpty(), false, "isEmpty is failed")
95
 }
95
 }
96
 
96
 
97
-func Test_insertSub(t *testing.T){
98
-	node := &valueNode{next : nil, prev : nil}
97
+func Test_insertSub(t *testing.T) {
98
+	node := &valueNode{next: nil, prev: nil}
99
 
99
 
100
-// 1. Single2sub
100
+	// 1. Single2sub
101
 
101
 
102
 	// (1) <
102
 	// (1) <
103
 	node.insertSub("<", 1, true)
103
 	node.insertSub("<", 1, true)
104
-	assert.Equal(t, 0, findSub(node.single2sub_s, 1),"singleInsertSub (<) is failed")
104
+	assert.Equal(t, 0, findSub(node.single2sub_s, 1), "singleInsertSub (<) is failed")
105
 
105
 
106
 	// (2) <=
106
 	// (2) <=
107
 	node.insertSub("<=", 2, true)
107
 	node.insertSub("<=", 2, true)
119
 	node.insertSub("==", 5, true)
119
 	node.insertSub("==", 5, true)
120
 	assert.Equal(t, 0, findSub(node.single2sub_e, 5), "singleInsertSub (==) is failed")
120
 	assert.Equal(t, 0, findSub(node.single2sub_e, 5), "singleInsertSub (==) is failed")
121
 
121
 
122
-// 2. range2sub
122
+	// 2. range2sub
123
 
123
 
124
 	// (1) <
124
 	// (1) <
125
 	node.insertSub("<", 1, false)
125
 	node.insertSub("<", 1, false)
126
-	assert.Equal(t, 0, findSub(node.range2sub_s, 1),"rangeInsertSub (<) is failed")
126
+	assert.Equal(t, 0, findSub(node.range2sub_s, 1), "rangeInsertSub (<) is failed")
127
 
127
 
128
 	// (2) <=
128
 	// (2) <=
129
 	node.insertSub("<=", 2, false)
129
 	node.insertSub("<=", 2, false)
139
 
139
 
140
 }
140
 }
141
 
141
 
142
-func Test_getTopicPos(t *testing.T){
142
+func Test_getTopicPos(t *testing.T) {
143
 	l := topicList{}
143
 	l := topicList{}
144
 	l.addTopicNode(nil)
144
 	l.addTopicNode(nil)
145
 
145
 
146
 	// 1. find Topic in empty topiclist
146
 	// 1. find Topic in empty topiclist
147
-	if l.head.topic != nil{
147
+	if l.head.topic != nil {
148
 		fmt.Println("addTopicNode is failed")
148
 		fmt.Println("addTopicNode is failed")
149
 	}
149
 	}
150
 
150
 
151
 	// 2. find {{12},{34},{56},{78},{89},{90}} in topiclist
151
 	// 2. find {{12},{34},{56},{78},{89},{90}} in topiclist
152
-	ary := [][]int64{{12},{34},{56},{78},{89},{90}}
153
-	if l.getTopicNodePos(ary[0]) != nil{
152
+	ary := [][]int64{{12}, {34}, {56}, {78}, {89}, {90}}
153
+	if l.getTopicNodePos(ary[0]) != nil {
154
 		fmt.Println("getTopicNodePos is failed")
154
 		fmt.Println("getTopicNodePos is failed")
155
 	}
155
 	}
156
 
156
 
157
-	for i := 0; i < 5; i++{
157
+	for i := 0; i < 5; i++ {
158
 		l.addTopicNode(ary[i])
158
 		l.addTopicNode(ary[i])
159
 		assert.Equal(t, ary[i], l.tail.topic)
159
 		assert.Equal(t, ary[i], l.tail.topic)
160
 	}
160
 	}
161
 
161
 
162
 }
162
 }
163
 
163
 
164
-func Test_getValuePos(t *testing.T){
164
+func Test_getValuePos(t *testing.T) {
165
 	l := valueList{}
165
 	l := valueList{}
166
 	l.addValueNode(nil)
166
 	l.addValueNode(nil)
167
 
167
 
168
-	ary := [][]int64{{12},{34},{56},{78},{89},{90}}
168
+	ary := [][]int64{{12}, {34}, {56}, {78}, {89}, {90}}
169
 
169
 
170
 	// 1. find {{12},{34},{56},{78},{89},{90}} in topiclist
170
 	// 1. find {{12},{34},{56},{78},{89},{90}} in topiclist
171
-	for i := 0; i < 5; i++{
171
+	for i := 0; i < 5; i++ {
172
 		l.addValueNode(ary[i])
172
 		l.addValueNode(ary[i])
173
 		assert.Equal(t, ary[i], l.tail.val)
173
 		assert.Equal(t, ary[i], l.tail.val)
174
 	}
174
 	}
175
-}
175
+}

+ 28
- 28
src/broker/modules/matching.go Dosyayı Görüntüle

7
 )
7
 )
8
 
8
 
9
 type match_manager struct {
9
 type match_manager struct {
10
-	match_count	int
10
+	match_count int
11
 }
11
 }
12
 
12
 
13
 // Returns a list of IP addresses of matched subs
13
 // Returns a list of IP addresses of matched subs
14
 func (match_mng *match_manager) Matching(queue *MsgQueue, sub_mng *sub_manager) ([]string, MsgUnit, error) {
14
 func (match_mng *match_manager) Matching(queue *MsgQueue, sub_mng *sub_manager) ([]string, MsgUnit, error) {
15
 	msg := queue.pop(true)
15
 	msg := queue.pop(true)
16
-	topic := msg.(*PublishMsg).topic
17
-	value := msg.(*PublishMsg).value
16
+	topic := msg.(*PublishMsg).Topic
17
+	value := msg.(*PublishMsg).Value
18
 
18
 
19
 	// list
19
 	// list
20
 	ret := make([]string, 0)
20
 	ret := make([]string, 0)
23
 	big := make([]int, 0)
23
 	big := make([]int, 0)
24
 	small := make([]int, 0)
24
 	small := make([]int, 0)
25
 
25
 
26
-	// 1. Find (topicNode[topic] == msg.topic) Node
26
+	// 1. Find (topicNode[Topic] == msg.Topic) Node
27
 	topicPtr := sub_mng.list
27
 	topicPtr := sub_mng.list
28
 	pos := topicPtr.getTopicNodePos(topic)
28
 	pos := topicPtr.getTopicNodePos(topic)
29
 
29
 
30
 	// Don't Exist topicNode
30
 	// Don't Exist topicNode
31
-	if pos == nil{
32
-		return nil,nil,errors.New("Don't Exist topicNode")
31
+	if pos == nil {
32
+		return nil, nil, errors.New("Don't Exist topicNode")
33
 	}
33
 	}
34
 
34
 
35
 	// 2. Traverse all valueNode -> and Match
35
 	// 2. Traverse all valueNode -> and Match
36
 	valPtr := pos.list.head
36
 	valPtr := pos.list.head
37
-	for valPtr != nil{
37
+	for valPtr != nil {
38
 		compare := Compare(valPtr.val, value)
38
 		compare := Compare(valPtr.val, value)
39
 		if compare < 0 { // sub.val > pub.val
39
 		if compare < 0 { // sub.val > pub.val
40
-		// single : { >, >= }
40
+			// single : { >, >= }
41
 
41
 
42
 			// (1) case : >
42
 			// (1) case : >
43
-			for i := 0; i < len(valPtr.single2sub_b); i++{
43
+			for i := 0; i < len(valPtr.single2sub_b); i++ {
44
 				sub := valPtr.single2sub_b[i]
44
 				sub := valPtr.single2sub_b[i]
45
 				ip := sub_mng.sub2ip[sub]
45
 				ip := sub_mng.sub2ip[sub]
46
 				ret = append(ret, ip)
46
 				ret = append(ret, ip)
47
 			}
47
 			}
48
 
48
 
49
 			// (2) case : >=
49
 			// (2) case : >=
50
-			for i := 0; i < len(valPtr.single2sub_eb); i++{
50
+			for i := 0; i < len(valPtr.single2sub_eb); i++ {
51
 				sub := valPtr.single2sub_eb[i]
51
 				sub := valPtr.single2sub_eb[i]
52
 				ip := sub_mng.sub2ip[sub]
52
 				ip := sub_mng.sub2ip[sub]
53
 				ret = append(ret, ip)
53
 				ret = append(ret, ip)
54
 			}
54
 			}
55
-		// range : { >, >= }
55
+			// range : { >, >= }
56
 
56
 
57
 			// (1) case : >
57
 			// (1) case : >
58
-			for i := 0; i < len(valPtr.range2sub_b); i++{
58
+			for i := 0; i < len(valPtr.range2sub_b); i++ {
59
 				sub := valPtr.range2sub_b[i]
59
 				sub := valPtr.range2sub_b[i]
60
 				big = append(big, sub)
60
 				big = append(big, sub)
61
 			}
61
 			}
62
 
62
 
63
 			// (2) case : >=
63
 			// (2) case : >=
64
-			for i := 0; i < len(valPtr.range2sub_eb); i++{
64
+			for i := 0; i < len(valPtr.range2sub_eb); i++ {
65
 				sub := valPtr.range2sub_eb[i]
65
 				sub := valPtr.range2sub_eb[i]
66
 				big = append(big, sub)
66
 				big = append(big, sub)
67
 			}
67
 			}
68
 
68
 
69
 		} else if compare > 0 { // sub.val < pub.val
69
 		} else if compare > 0 { // sub.val < pub.val
70
 
70
 
71
-		// single : { <, <= }
71
+			// single : { <, <= }
72
 
72
 
73
 			// (1) case : <
73
 			// (1) case : <
74
-			for i := 0; i < len(valPtr.single2sub_s); i++{
74
+			for i := 0; i < len(valPtr.single2sub_s); i++ {
75
 				sub := valPtr.single2sub_s[i]
75
 				sub := valPtr.single2sub_s[i]
76
 				ip := sub_mng.sub2ip[sub]
76
 				ip := sub_mng.sub2ip[sub]
77
 				ret = append(ret, ip)
77
 				ret = append(ret, ip)
78
 			}
78
 			}
79
 
79
 
80
 			// (2) case : <=
80
 			// (2) case : <=
81
-			for i := 0; i < len(valPtr.single2sub_es); i++{
81
+			for i := 0; i < len(valPtr.single2sub_es); i++ {
82
 				sub := valPtr.single2sub_es[i]
82
 				sub := valPtr.single2sub_es[i]
83
 				ip := sub_mng.sub2ip[sub]
83
 				ip := sub_mng.sub2ip[sub]
84
 				ret = append(ret, ip)
84
 				ret = append(ret, ip)
85
 			}
85
 			}
86
 
86
 
87
-		// range : { <, <= }
87
+			// range : { <, <= }
88
 
88
 
89
 			// (1) case : <
89
 			// (1) case : <
90
-			for i := 0; i < len(valPtr.range2sub_s); i++{
90
+			for i := 0; i < len(valPtr.range2sub_s); i++ {
91
 				sub := valPtr.range2sub_s[i]
91
 				sub := valPtr.range2sub_s[i]
92
 				small = append(small, sub)
92
 				small = append(small, sub)
93
 			}
93
 			}
94
 			// (2) case : <=
94
 			// (2) case : <=
95
-			for i := 0; i < len(valPtr.range2sub_es); i++{
95
+			for i := 0; i < len(valPtr.range2sub_es); i++ {
96
 				sub := valPtr.range2sub_es[i]
96
 				sub := valPtr.range2sub_es[i]
97
 				small = append(small, sub)
97
 				small = append(small, sub)
98
 			}
98
 			}
99
 
99
 
100
-		} else{ // sub.val == pub.val
100
+		} else { // sub.val == pub.val
101
 
101
 
102
-		// single : { <=, >=, ==}
102
+			// single : { <=, >=, ==}
103
 
103
 
104
 			// (1) case : <=
104
 			// (1) case : <=
105
-			for i := 0; i < len(valPtr.single2sub_es); i++{
105
+			for i := 0; i < len(valPtr.single2sub_es); i++ {
106
 				sub := valPtr.single2sub_es[i]
106
 				sub := valPtr.single2sub_es[i]
107
 				ip := sub_mng.sub2ip[sub]
107
 				ip := sub_mng.sub2ip[sub]
108
 				ret = append(ret, ip)
108
 				ret = append(ret, ip)
109
 			}
109
 			}
110
 
110
 
111
 			// (2) case : >=
111
 			// (2) case : >=
112
-			for i := 0; i < len(valPtr.single2sub_eb); i++{
112
+			for i := 0; i < len(valPtr.single2sub_eb); i++ {
113
 				sub := valPtr.single2sub_eb[i]
113
 				sub := valPtr.single2sub_eb[i]
114
 				ip := sub_mng.sub2ip[sub]
114
 				ip := sub_mng.sub2ip[sub]
115
 				ret = append(ret, ip)
115
 				ret = append(ret, ip)
116
 			}
116
 			}
117
 
117
 
118
 			// (3) case : ==
118
 			// (3) case : ==
119
-			for i := 0; i < len(valPtr.single2sub_e); i++{
119
+			for i := 0; i < len(valPtr.single2sub_e); i++ {
120
 				sub := valPtr.single2sub_e[i]
120
 				sub := valPtr.single2sub_e[i]
121
 				ip := sub_mng.sub2ip[sub]
121
 				ip := sub_mng.sub2ip[sub]
122
 				ret = append(ret, ip)
122
 				ret = append(ret, ip)
123
 			}
123
 			}
124
 
124
 
125
-		// range : { <=, >= }
125
+			// range : { <=, >= }
126
 
126
 
127
 			// (1) case : <=
127
 			// (1) case : <=
128
-			for i := 0; i < len(valPtr.range2sub_es); i++{
128
+			for i := 0; i < len(valPtr.range2sub_es); i++ {
129
 				sub := valPtr.range2sub_es[i]
129
 				sub := valPtr.range2sub_es[i]
130
 				small = append(small, sub)
130
 				small = append(small, sub)
131
 			}
131
 			}
132
 
132
 
133
 			// (2) case : >=
133
 			// (2) case : >=
134
-			for i := 0; i < len(valPtr.range2sub_eb); i++{
134
+			for i := 0; i < len(valPtr.range2sub_eb); i++ {
135
 				sub := valPtr.range2sub_eb[i]
135
 				sub := valPtr.range2sub_eb[i]
136
 				big = append(big, sub)
136
 				big = append(big, sub)
137
 			}
137
 			}
141
 
141
 
142
 	// Add the intersection IP address of two sets (large and small) to the return list
142
 	// Add the intersection IP address of two sets (large and small) to the return list
143
 	hash := intersect.Hash(small, big)
143
 	hash := intersect.Hash(small, big)
144
-	for i := 0; i < len(hash); i++{
144
+	for i := 0; i < len(hash); i++ {
145
 		sub := hash[i].(int)
145
 		sub := hash[i].(int)
146
 		ip := sub_mng.sub2ip[sub]
146
 		ip := sub_mng.sub2ip[sub]
147
 		ret = append(ret, ip)
147
 		ret = append(ret, ip)

+ 24
- 38
src/broker/modules/message.go Dosyayı Görüntüle

16
 //*****메세지 틀*****
16
 //*****메세지 틀*****
17
 
17
 
18
 type Message struct {
18
 type Message struct {
19
-	from    string //ip주소
20
-	version string
21
-	time    string
22
-	kind    int //종류
23
-}
24
-
25
-func (msg Message) From() string {
26
-	return msg.from
27
-}
28
-
29
-func (msg Message) Version() string {
30
-	return msg.version
31
-}
32
-
33
-func (msg Message) Time() string {
34
-	return msg.time
35
-}
36
-
37
-func (msg Message) Kind() int {
38
-	return msg.kind
19
+	From    string //ip주소
20
+	Version string
21
+	Time    string
22
+	Kind    int //종류
39
 }
23
 }
40
 
24
 
41
 type MsgUnit interface {
25
 type MsgUnit interface {
42
 	// ConvertToJson - send 전 json형식으로 바꾸는 함수
26
 	// ConvertToJson - send 전 json형식으로 바꾸는 함수
43
 	ConvertToJson() ([]byte, error)
27
 	ConvertToJson() ([]byte, error)
28
+	//JsontoMsg
29
+	//JsontoMsg()(msg MsgUnit)
44
 	// CheckType - Message의 타입을 알려줌
30
 	// CheckType - Message의 타입을 알려줌
45
 	CheckType() int
31
 	CheckType() int
46
 }
32
 }
62
 //전달할 내용을 담은 메세지
48
 //전달할 내용을 담은 메세지
63
 type PublishMsg struct {
49
 type PublishMsg struct {
64
 	Message
50
 	Message
65
-	topic []int64 //대주제
66
-	value []int64 //topic의 세부적인 내용
67
-	content []int64 // 내용
51
+	Topic   []int64 //대주제
52
+	Value   []int64 //topic의 세부적인 내용
53
+	Content []int64 // 내용
68
 }
54
 }
69
 
55
 
70
 //구독 정보를 담은 메세지
56
 //구독 정보를 담은 메세지
71
 type SubscriptionMsg struct {
57
 type SubscriptionMsg struct {
72
 	Message
58
 	Message
73
-	topic []int64 //대주제
74
-	value []int64 //피연산자
59
+	topic    []int64  //대주제
60
+	value    []int64  //피연산자
75
 	operator []string //연산자
61
 	operator []string //연산자
76
-  	isAlpha bool //value가 숫자인지 문자열인지
62
+	isAlpha  bool     //value가 숫자인지 문자열인지
77
 }
63
 }
78
 
64
 
79
 //Microservice 등록 메세지
65
 //Microservice 등록 메세지
88
 
74
 
89
 //**************************
75
 //**************************
90
 
76
 
91
-func (msg *KeyGenMsg) ConvertToJson() ([]byte, error) {
77
+func (msg KeyGenMsg) ConvertToJson() ([]byte, error) {
92
 	js := msg
78
 	js := msg
93
 	jsonBytes, err := json.Marshal(js)
79
 	jsonBytes, err := json.Marshal(js)
94
 	return jsonBytes, err
80
 	return jsonBytes, err
95
 }
81
 }
96
 
82
 
97
-func (msg *KeyShareMsg) ConvertToJson() ([]byte, error) {
83
+func (msg KeyShareMsg) ConvertToJson() ([]byte, error) {
98
 	js := msg
84
 	js := msg
99
 	jsonBytes, err := json.Marshal(js)
85
 	jsonBytes, err := json.Marshal(js)
100
 	return jsonBytes, err
86
 	return jsonBytes, err
101
 }
87
 }
102
 
88
 
103
-func (msg *PublishMsg) ConvertToJson() ([]byte, error) {
89
+func (msg PublishMsg) ConvertToJson() ([]byte, error) {
104
 	js := msg
90
 	js := msg
105
 	jsonBytes, err := json.Marshal(js)
91
 	jsonBytes, err := json.Marshal(js)
106
 	return jsonBytes, err
92
 	return jsonBytes, err
107
 }
93
 }
108
 
94
 
109
-func (msg *SubscriptionMsg) ConvertToJson() ([]byte, error) {
95
+func (msg SubscriptionMsg) ConvertToJson() ([]byte, error) {
110
 	js := msg
96
 	js := msg
111
 	jsonBytes, err := json.Marshal(js)
97
 	jsonBytes, err := json.Marshal(js)
112
 	return jsonBytes, err
98
 	return jsonBytes, err
113
 }
99
 }
114
 
100
 
115
-func (msg *RegisterMsg) ConvertToJson() ([]byte, error) {
101
+func (msg RegisterMsg) ConvertToJson() ([]byte, error) {
116
 	js := msg
102
 	js := msg
117
 	jsonBytes, err := json.Marshal(js)
103
 	jsonBytes, err := json.Marshal(js)
118
 	return jsonBytes, err
104
 	return jsonBytes, err
119
 }
105
 }
120
 
106
 
121
-func (msg *WithdrawMsg) ConvertToJson() ([]byte, error) {
107
+func (msg WithdrawMsg) ConvertToJson() ([]byte, error) {
122
 	js := msg
108
 	js := msg
123
 	jsonBytes, err := json.Marshal(js)
109
 	jsonBytes, err := json.Marshal(js)
124
 	return jsonBytes, err
110
 	return jsonBytes, err
125
 }
111
 }
126
 
112
 
127
 func (msg Message) CheckType() int {
113
 func (msg Message) CheckType() int {
128
-	return msg.kind
114
+	return msg.Kind
129
 }
115
 }
130
 
116
 
131
 //KeyGenMsg 생성자
117
 //KeyGenMsg 생성자
132
-func NewKeyGenMsg(table *MStable) *KeyGenMsg{
133
-	m:= &KeyGenMsg{}
134
-	for _,value:= range table.NodeTable{
135
-		m.iptable=append(m.iptable,value.Getipaddr())
118
+func NewKeyGenMsg(table *MStable) *KeyGenMsg {
119
+	m := &KeyGenMsg{}
120
+	for _, value := range table.NodeTable {
121
+		m.iptable = append(m.iptable, value.Getipaddr())
136
 	}
122
 	}
137
 	return m
123
 	return m
138
-}
124
+}

+ 7
- 3
src/broker/modules/queue.go Dosyayı Görüntüle

1
 package modules
1
 package modules
2
 
2
 
3
-import "errors"
3
+import (
4
+	"errors"
5
+	"log"
6
+)
4
 
7
 
5
 type MsgQueue struct {
8
 type MsgQueue struct {
6
 	queue     chan MsgUnit
9
 	queue     chan MsgUnit
21
 		return errors.New("Queue Hadlerer Error: Already initialized.")
24
 		return errors.New("Queue Hadlerer Error: Already initialized.")
22
 	} else if mq.queue == nil {
25
 	} else if mq.queue == nil {
23
 		mq.queue = make(chan MsgUnit, 1000)
26
 		mq.queue = make(chan MsgUnit, 1000)
27
+		log.Println("queue is initialized.")
24
 		return nil
28
 		return nil
25
 	} else {
29
 	} else {
26
 		close(mq.queue)
30
 		close(mq.queue)
34
 	return true
38
 	return true
35
 }
39
 }
36
 
40
 
37
-func (mq *MsgQueue) pop(wait bool) MsgUnit {
38
-	if wait == true {
41
+func (mq *MsgQueue) pop(block bool) MsgUnit {
42
+	if block == false {
39
 		if len(mq.queue) == 0 {
43
 		if len(mq.queue) == 0 {
40
 			return nil
44
 			return nil
41
 		} else {
45
 		} else {

+ 5
- 5
src/broker/modules/secure.go Dosyayı Görüntüle

29
 keyShareMsg 에서 각 노드의 private 키를 받아 keyMap 에 저장
29
 keyShareMsg 에서 각 노드의 private 키를 받아 keyMap 에 저장
30
 */
30
 */
31
 func (sc Security) RegKey(ksm KeyShareMsg) {
31
 func (sc Security) RegKey(ksm KeyShareMsg) {
32
-	sc.KeyMap[ksm.Message.From()] = ksm.key
32
+	sc.KeyMap[ksm.Message.From] = ksm.key
33
 }
33
 }
34
 
34
 
35
 /**
35
 /**
67
 // topic과 value는 m+k로만 존재하므로 ReEnc과정에서 subscriber의 개인키만 더해주면 된다.
67
 // topic과 value는 m+k로만 존재하므로 ReEnc과정에서 subscriber의 개인키만 더해주면 된다.
68
 func (sc Security) ReEncPubMsg(fromPubMsg PublishMsg, nodeName string) *PublishMsg {
68
 func (sc Security) ReEncPubMsg(fromPubMsg PublishMsg, nodeName string) *PublishMsg {
69
 	toKey := sc.GetNodeKey(nodeName)
69
 	toKey := sc.GetNodeKey(nodeName)
70
-	fromKey := sc.GetNodeKey(fromPubMsg.Message.From())
70
+	fromKey := sc.GetNodeKey(fromPubMsg.Message.From)
71
 
71
 
72
 	toPubMsg := new(PublishMsg)
72
 	toPubMsg := new(PublishMsg)
73
 	toPubMsg.Message = fromPubMsg.Message
73
 	toPubMsg.Message = fromPubMsg.Message
74
-	toPubMsg.topic = sc.ReEncryptWithoutPrivateKey(toKey, fromPubMsg.topic)
75
-	toPubMsg.value = sc.ReEncryptWithoutPrivateKey(toKey, fromPubMsg.value)
76
-	toPubMsg.content = sc.ReEncrypt(fromKey, toKey, fromPubMsg.content)
74
+	toPubMsg.Topic = sc.ReEncryptWithoutPrivateKey(toKey, fromPubMsg.Topic)
75
+	toPubMsg.Value = sc.ReEncryptWithoutPrivateKey(toKey, fromPubMsg.Value)
76
+	toPubMsg.Content = sc.ReEncrypt(fromKey, toKey, fromPubMsg.Content)
77
 
77
 
78
 	return toPubMsg
78
 	return toPubMsg
79
 }
79
 }

+ 43
- 52
src/broker/modules/secure_test.go Dosyayı Görüntüle

10
 	var sm SecurityManager
10
 	var sm SecurityManager
11
 	sm = security
11
 	sm = security
12
 
12
 
13
-	ksm := KeyShareMsg{Message: Message{from: "1.1.1.1", version: "1", time: "2", kind: 1}, key: "1234"}
13
+	ksm := KeyShareMsg{Message: Message{From: "1.1.1.1", Version: "1", Time: "2", Kind: 1}, key: "1234"}
14
 	sm.RegKey(ksm)
14
 	sm.RegKey(ksm)
15
-	sm.GetNodeKey(ksm.Message.From())
16
-	fmt.Println(sm.GetNodeKey(ksm.Message.From()))
15
+	sm.GetNodeKey(ksm.Message.From)
16
+	fmt.Println(sm.GetNodeKey(ksm.Message.From))
17
 	var targetKey []int64
17
 	var targetKey []int64
18
 	targetKey = []int64{1234, 1235, 1236}
18
 	targetKey = []int64{1234, 1235, 1236}
19
-	fmt.Println(sm.ReEncrypt(sm.GetNodeKey(ksm.Message.From()), 0, targetKey))
19
+	fmt.Println(sm.ReEncrypt(sm.GetNodeKey(ksm.Message.From), 0, targetKey))
20
 	//fmt.Println(sm.CompareDigit(1236, 1234))
20
 	//fmt.Println(sm.CompareDigit(1236, 1234))
21
 
21
 
22
 }
22
 }
26
 	toPubMsg.Message = msg
26
 	toPubMsg.Message = msg
27
 
27
 
28
 	intArr := []rune(topic)
28
 	intArr := []rune(topic)
29
-	//fmt.Print("topic length ")
29
+	//fmt.Print("Topic length ")
30
 	//fmt.Println(len(intArr))
30
 	//fmt.Println(len(intArr))
31
-	//fmt.Println(len(toPubMsg.topic))
31
+	//fmt.Println(len(toPubMsg.Topic))
32
 	for index := 0; index < len(intArr); index++ {
32
 	for index := 0; index < len(intArr); index++ {
33
-		toPubMsg.topic = append(toPubMsg.topic, int64(intArr[index]))
33
+		toPubMsg.Topic = append(toPubMsg.Topic, int64(intArr[index]))
34
 	}
34
 	}
35
-	//fmt.Println(len(toPubMsg.topic))
35
+	//fmt.Println(len(toPubMsg.Topic))
36
 	intArr = []rune(value)
36
 	intArr = []rune(value)
37
 	for index := 0; index < len(intArr); index++ {
37
 	for index := 0; index < len(intArr); index++ {
38
-		toPubMsg.value = append(toPubMsg.value, int64(intArr[index]))
38
+		toPubMsg.Value = append(toPubMsg.Value, int64(intArr[index]))
39
 	}
39
 	}
40
 	intArr = []rune(content)
40
 	intArr = []rune(content)
41
 	for index := 0; index < len(intArr); index++ {
41
 	for index := 0; index < len(intArr); index++ {
42
-		toPubMsg.content = append(toPubMsg.content, int64(intArr[index]))
42
+		toPubMsg.Content = append(toPubMsg.Content, int64(intArr[index]))
43
 	}
43
 	}
44
 
44
 
45
 	return toPubMsg
45
 	return toPubMsg
46
 }
46
 }
47
 
47
 
48
 func EncryptionMsg(msg *PublishMsg, gyKey int64, privateKey int64) *PublishMsg {
48
 func EncryptionMsg(msg *PublishMsg, gyKey int64, privateKey int64) *PublishMsg {
49
-	for index := range msg.topic {
50
-		msg.topic[index] = msg.topic[index] + gyKey
49
+	for index := range msg.Topic {
50
+		msg.Topic[index] = msg.Topic[index] + gyKey
51
 	}
51
 	}
52
-	for index := range msg.value {
53
-		msg.value[index] = msg.value[index] + gyKey
52
+	for index := range msg.Value {
53
+		msg.Value[index] = msg.Value[index] + gyKey
54
 	}
54
 	}
55
-	for index := range msg.content {
56
-		msg.content[index] = msg.content[index] + gyKey + privateKey
55
+	for index := range msg.Content {
56
+		msg.Content[index] = msg.Content[index] + gyKey + privateKey
57
 	}
57
 	}
58
 
58
 
59
-
60
 	return msg
59
 	return msg
61
 }
60
 }
62
 
61
 
63
 func DecryptionMsg(msg *PublishMsg, gyKey int64, privateKey int64) {
62
 func DecryptionMsg(msg *PublishMsg, gyKey int64, privateKey int64) {
64
-	for index := range msg.topic {
65
-		msg.topic[index] = msg.topic[index] - gyKey - privateKey
63
+	for index := range msg.Topic {
64
+		msg.Topic[index] = msg.Topic[index] - gyKey - privateKey
66
 	}
65
 	}
67
-	for index := range msg.value {
68
-		msg.value[index] = msg.value[index] - gyKey - privateKey
66
+	for index := range msg.Value {
67
+		msg.Value[index] = msg.Value[index] - gyKey - privateKey
69
 	}
68
 	}
70
-	for index := range msg.content {
71
-		msg.content[index] = msg.content[index] - gyKey - privateKey
69
+	for index := range msg.Content {
70
+		msg.Content[index] = msg.Content[index] - gyKey - privateKey
72
 	}
71
 	}
73
 
72
 
74
 	var runeArr []rune
73
 	var runeArr []rune
75
-	for index := range msg.topic {
76
-		runeArr = append(runeArr, rune(int(msg.topic[index])))
74
+	for index := range msg.Topic {
75
+		runeArr = append(runeArr, rune(int(msg.Topic[index])))
77
 	}
76
 	}
78
-	fmt.Println("topic is: " + string(runeArr))
77
+	fmt.Println("Topic is: " + string(runeArr))
79
 	runeArr = nil
78
 	runeArr = nil
80
 
79
 
81
-	for index := range msg.value {
82
-		runeArr = append(runeArr, rune(int(msg.value[index])))
80
+	for index := range msg.Value {
81
+		runeArr = append(runeArr, rune(int(msg.Value[index])))
83
 	}
82
 	}
84
-	fmt.Println("value is: " + string(runeArr))
83
+	fmt.Println("Value is: " + string(runeArr))
85
 	runeArr = nil
84
 	runeArr = nil
86
 
85
 
87
-	for index := range msg.content {
88
-		runeArr = append(runeArr, rune(int(msg.content[index])))
86
+	for index := range msg.Content {
87
+		runeArr = append(runeArr, rune(int(msg.Content[index])))
89
 	}
88
 	}
90
-	fmt.Println("content is: " + string(runeArr))
89
+	fmt.Println("Content is: " + string(runeArr))
91
 	runeArr = nil
90
 	runeArr = nil
92
 
91
 
93
 }
92
 }
94
 
93
 
95
-func printMsg(msg *PublishMsg){
94
+func printMsg(msg *PublishMsg) {
96
 	var runeArr []rune
95
 	var runeArr []rune
97
-	for index := range msg.topic {
98
-		runeArr = append(runeArr, rune(int(msg.topic[index])))
96
+	for index := range msg.Topic {
97
+		runeArr = append(runeArr, rune(int(msg.Topic[index])))
99
 	}
98
 	}
100
-	fmt.Println("topic is: " + string(runeArr))
99
+	fmt.Println("Topic is: " + string(runeArr))
101
 	runeArr = nil
100
 	runeArr = nil
102
 
101
 
103
-	for index := range msg.value {
104
-		runeArr = append(runeArr, rune(int(msg.value[index])))
102
+	for index := range msg.Value {
103
+		runeArr = append(runeArr, rune(int(msg.Value[index])))
105
 	}
104
 	}
106
-	fmt.Println("value is: " + string(runeArr))
105
+	fmt.Println("Value is: " + string(runeArr))
107
 	runeArr = nil
106
 	runeArr = nil
108
 
107
 
109
-	for index := range msg.content {
110
-		runeArr = append(runeArr, rune(int(msg.content[index])))
108
+	for index := range msg.Content {
109
+		runeArr = append(runeArr, rune(int(msg.Content[index])))
111
 	}
110
 	}
112
-	fmt.Println("content is: " + string(runeArr) + "\n\n")
111
+	fmt.Println("Content is: " + string(runeArr) + "\n\n")
113
 	runeArr = nil
112
 	runeArr = nil
114
 }
113
 }
115
 
114
 
116
-// from "1.1.1.1" to "3.3.3.3" node
115
+// From "1.1.1.1" to "3.3.3.3" node
117
 func TestReEnc(t *testing.T) {
116
 func TestReEnc(t *testing.T) {
118
 	var security = NewSecurity()
117
 	var security = NewSecurity()
119
 	var sm SecurityManager
118
 	var sm SecurityManager
122
 	security.KeyMap["3.3.3.3"] = "99999"
121
 	security.KeyMap["3.3.3.3"] = "99999"
123
 
122
 
124
 	//fmt.Println(sm.GetNodeKey("3.3.3.3"))
123
 	//fmt.Println(sm.GetNodeKey("3.3.3.3"))
125
-	msg := Message{from: "1.1.1.1", version: "1", time: "123", kind: 3}
124
+	msg := Message{From: "1.1.1.1", Version: "1", Time: "123", Kind: 3}
126
 	//fmt.Println(msg)
125
 	//fmt.Println(msg)
127
 	publishMsg := CreatePubMsg(msg, "soccer123한글", "playerList", "Son and 10 players")
126
 	publishMsg := CreatePubMsg(msg, "soccer123한글", "playerList", "Son and 10 players")
128
 	fmt.Println(publishMsg)
127
 	fmt.Println(publishMsg)
139
 	fmt.Println("decrypted publish message is...")
138
 	fmt.Println("decrypted publish message is...")
140
 	DecryptionMsg(reEncPublishMsg, 1234, 99999)
139
 	DecryptionMsg(reEncPublishMsg, 1234, 99999)
141
 }
140
 }
142
-
143
-//func TestReEncUsingSHA(r *testing.T){
144
-//	var security = NewSecurity()
145
-//	var sm SecurityManager
146
-//	sm = security
147
-//	security.KeyMap["1.1.1.1"] = "56789"
148
-//	security.KeyMap["3.3.3.3"] = "99999"
149
-//}

+ 10
- 10
src/broker/modules/subscription.go Dosyayı Görüntüle

8
 	list topicList
8
 	list topicList
9
 
9
 
10
 	/* Manage sub# */
10
 	/* Manage sub# */
11
-	count_sub 	 int               // Subscription #
11
+	count_sub  int                  // Subscription #
12
 	emptylist  []int                // To administrate sub #
12
 	emptylist  []int                // To administrate sub #
13
 	ip2sub     map[string][]int     // ip2sub[ip] = sub# ...
13
 	ip2sub     map[string][]int     // ip2sub[ip] = sub# ...
14
-	sub2ip	   map[int]string		// sub2ip[sub#] = ip
14
+	sub2ip     map[int]string       // sub2ip[sub#] = ip
15
 	sub2node   map[int][]*valueNode // sub2node[sub#] = node_addr ...
15
 	sub2node   map[int][]*valueNode // sub2node[sub#] = node_addr ...
16
 	israngesub map[int]bool         // To manage when deleted
16
 	israngesub map[int]bool         // To manage when deleted
17
 }
17
 }
41
 	// * 1. Mapping incoming IP address to sub #
41
 	// * 1. Mapping incoming IP address to sub #
42
 
42
 
43
 	if len(manager.emptylist) == 0 {
43
 	if len(manager.emptylist) == 0 {
44
-		manager.ip2sub[msg.(*SubscriptionMsg).from] = append(manager.ip2sub[msg.(*SubscriptionMsg).from], manager.count_sub)
44
+		manager.ip2sub[msg.(*SubscriptionMsg).From] = append(manager.ip2sub[msg.(*SubscriptionMsg).From], manager.count_sub)
45
 		subnumber = manager.count_sub
45
 		subnumber = manager.count_sub
46
 		manager.count_sub++
46
 		manager.count_sub++
47
 	} else {
47
 	} else {
48
 		subidx := manager.emptylist[len(manager.emptylist)-1]
48
 		subidx := manager.emptylist[len(manager.emptylist)-1]
49
 		manager.emptylist = manager.emptylist[:len(manager.emptylist)-1]
49
 		manager.emptylist = manager.emptylist[:len(manager.emptylist)-1]
50
-		manager.ip2sub[msg.(*SubscriptionMsg).from] = append(manager.ip2sub[msg.(*SubscriptionMsg).from], subidx)
50
+		manager.ip2sub[msg.(*SubscriptionMsg).From] = append(manager.ip2sub[msg.(*SubscriptionMsg).From], subidx)
51
 		subnumber = subidx
51
 		subnumber = subidx
52
 	}
52
 	}
53
 
53
 
70
 		nameptr = manager.list.tail
70
 		nameptr = manager.list.tail
71
 	}
71
 	}
72
 
72
 
73
-	// Add value to list[name]
73
+	// Add Value to list[name]
74
 	if len(operator) == 1 { // if single expression
74
 	if len(operator) == 1 { // if single expression
75
 		valptr := nameptr.list.getValueNodePos(value)
75
 		valptr := nameptr.list.getValueNodePos(value)
76
 		if valptr == nil {
76
 		if valptr == nil {
88
 		// (ex) { (234 < x) && (x <= 1293) } , { (234 < x) || ( x < 1293) }
88
 		// (ex) { (234 < x) && (x <= 1293) } , { (234 < x) || ( x < 1293) }
89
 		logical_operator := operator[1]
89
 		logical_operator := operator[1]
90
 
90
 
91
-		// Find ValueNode = (namelist[name].list.val == value)
91
+		// Find ValueNode = (namelist[name].list.val == Value)
92
 		valptr1 := nameptr.list.getValueNodePos([]int64{value[0]})
92
 		valptr1 := nameptr.list.getValueNodePos([]int64{value[0]})
93
 		valptr2 := nameptr.list.getValueNodePos([]int64{value[1]})
93
 		valptr2 := nameptr.list.getValueNodePos([]int64{value[1]})
94
 
94
 
105
 		manager.sub2node[subnumber] = append(manager.sub2node[subnumber], valptr2)
105
 		manager.sub2node[subnumber] = append(manager.sub2node[subnumber], valptr2)
106
 
106
 
107
 		if logical_operator == "&&" {
107
 		if logical_operator == "&&" {
108
-			// If they are enclosed in '&&' -> Insert value to range_operator_list
108
+			// If they are enclosed in '&&' -> Insert Value to range_operator_list
109
 			manager.israngesub[subnumber] = true
109
 			manager.israngesub[subnumber] = true
110
 			valptr1.insertSub(operator[0], subnumber, false)
110
 			valptr1.insertSub(operator[0], subnumber, false)
111
 			valptr2.insertSub(operator[2], subnumber, false)
111
 			valptr2.insertSub(operator[2], subnumber, false)
112
 
112
 
113
 		} else {
113
 		} else {
114
-			// if they are enclosed in '||' -> Insert value to single_operator_list
114
+			// if they are enclosed in '||' -> Insert Value to single_operator_list
115
 			valptr1.insertSub(operator[0], subnumber, true)
115
 			valptr1.insertSub(operator[0], subnumber, true)
116
 			valptr2.insertSub(operator[2], subnumber, true)
116
 			valptr2.insertSub(operator[2], subnumber, true)
117
 		}
117
 		}
214
 			}
214
 			}
215
 		}
215
 		}
216
 	}
216
 	}
217
-	manager.ip2sub[ip]=nil // Delete sub#s mapped to Ip address
217
+	manager.ip2sub[ip] = nil // Delete sub#s mapped to Ip address
218
 	return nil
218
 	return nil
219
-}
219
+}

+ 289
- 288
src/broker/modules/subscription_test.go Dosyayı Görüntüle

1
 package modules
1
 package modules
2
 
2
 
3
-import (
4
-	"fmt"
5
-	_ "fmt"
6
-	"github.com/stretchr/testify/assert"
7
-	"math/rand"
8
-	"strconv"
9
-	"testing"
10
-	"time"
11
-)
12
-
13
-func makeData(isAlpha bool) MsgUnit{
14
-	rand.Seed(time.Now().UnixNano())
15
-	// Set Ipaddr
16
-	msg := Message{"", "1.0", "", SM}
17
-	for i := 0; i < 4; i++{
18
-		itoa := strconv.Itoa(rand.Int() % 256)
19
-		msg.from += itoa
20
-		if i != 3{
21
-			msg.from += "."
22
-		}
23
-	}
24
-
25
-	// Set Time
26
-	msg.time += strconv.Itoa(rand.Int()%24) + ":"
27
-	msg.time += strconv.Itoa(rand.Int()%60)
28
-
29
-	// Set Topic, Value, operator
30
-	topic := []int64{}
31
-	value := []int64{}
32
-	operator := []string{}
33
-	candOp := []string{">", ">=", "<=" ,"<", "=="}
34
-	logicalOp := []string{"&&", "||"}
35
-
36
-	if isAlpha {
37
-		topicLen := rand.Int() % 10 + 1
38
-		for i := 0 ; i < topicLen; i++{
39
-			topic = append(topic, rand.Int63())
40
-		}
41
-
42
-		valueLen := rand.Int() % 10 + 1
43
-		for i := 0 ; i < valueLen; i++{
44
-			value = append(value, rand.Int63())
45
-		}
46
-
47
-		operator = append(operator, "==")
48
-
49
-	} else{
50
-		valueLen := rand.Int() % 2 + 1
51
-
52
-		topic = append(topic, rand.Int63())
53
-		value = append(value, rand.Int63())
54
-
55
-		if valueLen == 1{
56
-			operator = append(operator, candOp[rand.Int()%5])
57
-		} else{
58
-			value = append(value, rand.Int63())
59
-			op := rand.Int()%2
60
-			operator = append(operator, candOp[rand.Int()%2])
61
-			if op == 0 {
62
-				operator = append(operator, logicalOp[0])
63
-			} else{
64
-				operator = append(operator, logicalOp[1])
65
-			}
66
-			operator = append(operator, candOp[rand.Int()%2 + 2])
67
-		}
68
-	}
69
-
70
-	return &SubscriptionMsg{msg, topic, value, operator, isAlpha}
71
-}
72
-
73
-func makeMsgList(dataLen int, isAlpha bool) []MsgUnit{
74
-	rand.Seed(time.Now().UnixNano())
75
-	var ret []MsgUnit
76
-	for i := 0 ; i < dataLen ; i++{
77
-		ret = append(ret, makeData(isAlpha))
78
-	}
79
-	return ret
80
-}
81
-
82
-func checkOperatorList(isSingle bool, sub int, operator string, l *valueNode) int{
83
-	ret := -1
84
-	if isSingle{
85
-		switch operator {
86
-		case "<":
87
-			ret = findSub(l.single2sub_s, sub)
88
-		case "<=":
89
-			ret = findSub(l.single2sub_es, sub)
90
-		case ">":
91
-			ret = findSub(l.single2sub_b, sub)
92
-		case ">=":
93
-			ret = findSub(l.single2sub_eb, sub)
94
-		case "==":
95
-			ret = findSub(l.single2sub_e, sub)
96
-		}
97
-
98
-	} else{
99
-		switch operator {
100
-		case "<":
101
-			ret = findSub(l.range2sub_s, sub)
102
-		case "<=":
103
-			ret = findSub(l.range2sub_es, sub)
104
-		case ">":
105
-			ret = findSub(l.range2sub_b, sub)
106
-		case ">=":
107
-			ret = findSub(l.range2sub_eb, sub)
108
-		}
109
-	}
110
-	return ret
111
-}
112
-
113
-func watchData(msgList []MsgUnit, dataLen int){
114
-	for i := 0; i < dataLen; i++{
115
-		msg := msgList[i]
116
-		fmt.Println(
117
-			"\nfrom = ", msg.(*SubscriptionMsg).Message.from,
118
-			"\ntime = ", msg.(*SubscriptionMsg).Message.time,
119
-			"\ntopic = ", msg.(*SubscriptionMsg).topic,
120
-			"\nvalue = ", msg.(*SubscriptionMsg).value,
121
-			"\noperator = ", msg.(*SubscriptionMsg).operator,
122
-			"\nisAlpha ?= ", msg.(*SubscriptionMsg).isAlpha,
123
-		)
124
-	}
125
-}
126
-
127
-//Test addSubScription(1) (dif all [topic, value, operator])
128
-func Test_addSubscription_allDif(t *testing.T) {
129
-	rand.Seed(time.Now().UnixNano())
130
-
131
-	// To Init sub_mng
132
-	s := sub_manager{
133
-		ip2sub:     make(map[string][]int),
134
-		sub2node:   make(map[int][]*valueNode),
135
-		israngesub: make(map[int]bool),
136
-	}
137
-	mos := Moscato{sub_mng: s}
138
-
139
-	// Make Data set(Subscription)
140
-	var msgList []MsgUnit
141
-	dataLen := 100
142
-	msgList = makeMsgList(dataLen, false)
143
-
144
-	//Watch Data set
145
-	//watchData(msgList, dataLen)
146
-
147
-	for i := 0; i < dataLen; i++ {
148
-		msg := msgList[i]
149
-		ip := msg.(*SubscriptionMsg).Message.from
150
-		topic := msg.(*SubscriptionMsg).topic
151
-		value := msg.(*SubscriptionMsg).value
152
-		operator := msg.(*SubscriptionMsg).operator
153
-		subnumber := mos.sub_mng.count_sub
154
-		isSingle := true
155
-
156
-		// 0. Check addSubscription
157
-		err := mos.sub_mng.addSubscription(msg)
158
-		assert.Equal(t, nil, err)
159
-
160
-		// 1. Check if ip mapping is correct
161
-		assert.Equal(t, subnumber, mos.sub_mng.ip2sub[ip][len(mos.sub_mng.ip2sub[ip])-1], "Ip mapping is failed")
162
-
163
-		// 2. Check topicNode
164
-		topicPtr := mos.sub_mng.list.head
165
-		for topicPtr != nil {
166
-			if Compare(topic, topicPtr.topic) == 0 {
167
-				break
168
-			}
169
-			topicPtr = topicPtr.next
170
-		}
171
-
172
-		assert.Equal(t, topic, topicPtr.topic, "topicNode Add is failed")
173
-
174
-		// Check isSingle ?
175
-		if len(operator) == 3 && operator[1] == "&&" {
176
-			isSingle = false
177
-		}
178
-
179
-		// 3. Check Value in ValueNode & Check Operator in ValueNode
180
-		if !isSingle || (len(operator) == 3 && operator[1] == "||") {
181
-			valptr1 := topicPtr.list.getValueNodePos([]int64{value[0]})
182
-			valptr2 := topicPtr.list.getValueNodePos([]int64{value[1]})
183
-
184
-			assert.Equal(t, []int64{value[0]}, valptr1.val)
185
-			assert.Equal(t, []int64{value[1]}, valptr2.val)
186
-
187
-			assert.NotEqual(t, -1, checkOperatorList(isSingle, subnumber, operator[0], valptr1))
188
-			assert.NotEqual(t, -1, checkOperatorList(isSingle, subnumber, operator[2], valptr2))
189
-
190
-		} else {
191
-			valptr := topicPtr.list.getValueNodePos(value)
192
-			assert.Equal(t, value, valptr.val)
193
-			assert.NotEqual(t, -1, checkOperatorList(isSingle, subnumber, operator[0], valptr))
194
-		}
195
-	}
196
-}
197
-
198
-// Test addSubScription(2) (same [topic, value] dif [operator])
199
-func Test_addSubscription_same_topicNvalue(t *testing.T) {
200
-	rand.Seed(time.Now().UnixNano())
201
-
202
-	// To Init sub_mng
203
-	s := sub_manager{
204
-		ip2sub:     make(map[string][]int),
205
-		sub2node:   make(map[int][]*valueNode),
206
-		israngesub: make(map[int]bool),
207
-	}
208
-
209
-	mos := Moscato{sub_mng: s}
210
-
211
-	// Fix Topic & Value
212
-	topicLen := rand.Int()%10 + 1
213
-	staticTopic := make([]int64, topicLen)
214
-	staticValue := []int64{rand.Int63()}
215
-
216
-
217
-	// Make Data
218
-	var msgList []MsgUnit
219
-	dataLen := 100
220
-	msgList = makeMsgList(dataLen, false)
221
-
222
-	for i := 0; i < topicLen; i++ {
223
-		staticTopic[i] = rand.Int63()
224
-	}
225
-
226
-	// Fix Same Topic & Value
227
-	for i := 0; i < dataLen; i++ {
228
-		operator := msgList[i].(*SubscriptionMsg).operator
229
-		msgList[i].(*SubscriptionMsg).topic = staticTopic
230
-
231
-		if len(operator) == 1{
232
-			msgList[i].(*SubscriptionMsg).value = staticValue
233
-		}
234
-	}
235
-
236
-	// Watch Data set
237
-	// watchData(msgList, dataLen)
238
-
239
-	for i := 0; i < dataLen; i++ {
240
-		msg := msgList[i]
241
-		err := mos.sub_mng.addSubscription(msg)
242
-		assert.Equal(t, nil, err)
243
-	}
244
-
245
-	topicPtr := mos.sub_mng.list.head
246
-	for topicPtr != nil {
247
-		if Compare(topicPtr.topic, staticTopic) == 0 {
248
-			break
249
-		}
250
-		topicPtr = topicPtr.next
251
-	}
252
-	watchValueNode(topicPtr.list.head)
253
-
254
-}
255
-
256
-func watchValueNode(ptr *valueNode) {
257
-	valPtr := ptr
258
-	for valPtr != nil{
259
-		fmt.Println("Value = ", valPtr.val)
260
-		if len(valPtr.single2sub_s) != 0 {
261
-			fmt.Println("Single2sub (<) List = ", valPtr.single2sub_s)
262
-		}
263
-		if len(valPtr.single2sub_es) != 0 {
264
-			fmt.Println("Single2sub (<=) List = ", valPtr.single2sub_es)
265
-		}
266
-		if len(valPtr.single2sub_b) != 0 {
267
-			fmt.Println("Single2sub (>) List = ", valPtr.single2sub_b)
268
-		}
269
-		if len(valPtr.single2sub_eb) != 0 {
270
-			fmt.Println("Single2sub (>=) List = ", valPtr.single2sub_eb)
271
-		}
272
-		if len(valPtr.single2sub_e) != 0 {
273
-			fmt.Println("Single2sub (==) List = ", valPtr.single2sub_e)
274
-		}
275
-
276
-		if len(valPtr.range2sub_s) != 0 {
277
-			fmt.Println("range2sub (<) List = ", valPtr.range2sub_s)
278
-		}
279
-		if len(valPtr.range2sub_es) != 0 {
280
-			fmt.Println("range2sub (<=) List = ", valPtr.range2sub_es)
281
-		}
282
-		if len(valPtr.range2sub_b) != 0 {
283
-			fmt.Println("range2sub (>) List = ", valPtr.range2sub_b)
284
-		}
285
-		if len(valPtr.range2sub_eb) != 0 {
286
-			fmt.Println("range2sub (>=) List = ", valPtr.range2sub_eb)
287
-		}
288
-		valPtr = valPtr.next
289
-	}
290
-}
3
+//
4
+//import (
5
+//	"fmt"
6
+//	_ "fmt"
7
+//	"github.com/stretchr/testify/assert"
8
+//	"math/rand"
9
+//	"strconv"
10
+//	"testing"
11
+//	"Time"
12
+//)
13
+//
14
+//func makeData(isAlpha bool) MsgUnit{
15
+//	rand.Seed(Time.Now().UnixNano())
16
+//	// Set Ipaddr
17
+//	msg := Message{"", "1.0", "", SM}
18
+//	for i := 0; i < 4; i++{
19
+//		itoa := strconv.Itoa(rand.Int() % 256)
20
+//		msg.From += itoa
21
+//		if i != 3{
22
+//			msg.From += "."
23
+//		}
24
+//	}
25
+//
26
+//	// Set Time
27
+//	msg.Time += strconv.Itoa(rand.Int()%24) + ":"
28
+//	msg.Time += strconv.Itoa(rand.Int()%60)
29
+//
30
+//	// Set Topic, Value, operator
31
+//	Topic := []int64{}
32
+//	Value := []int64{}
33
+//	operator := []string{}
34
+//	candOp := []string{">", ">=", "<=" ,"<", "=="}
35
+//	logicalOp := []string{"&&", "||"}
36
+//
37
+//	if isAlpha {
38
+//		topicLen := rand.Int() % 10 + 1
39
+//		for i := 0 ; i < topicLen; i++{
40
+//			Topic = append(Topic, rand.Int63())
41
+//		}
42
+//
43
+//		valueLen := rand.Int() % 10 + 1
44
+//		for i := 0 ; i < valueLen; i++{
45
+//			Value = append(Value, rand.Int63())
46
+//		}
47
+//
48
+//		operator = append(operator, "==")
49
+//
50
+//	} else{
51
+//		valueLen := rand.Int() % 2 + 1
52
+//
53
+//		Topic = append(Topic, rand.Int63())
54
+//		Value = append(Value, rand.Int63())
55
+//
56
+//		if valueLen == 1{
57
+//			operator = append(operator, candOp[rand.Int()%5])
58
+//		} else{
59
+//			Value = append(Value, rand.Int63())
60
+//			op := rand.Int()%2
61
+//			operator = append(operator, candOp[rand.Int()%2])
62
+//			if op == 0 {
63
+//				operator = append(operator, logicalOp[0])
64
+//			} else{
65
+//				operator = append(operator, logicalOp[1])
66
+//			}
67
+//			operator = append(operator, candOp[rand.Int()%2 + 2])
68
+//		}
69
+//	}
70
+//
71
+//	return &SubscriptionMsg{msg, Topic, Value, operator, isAlpha}
72
+//}
73
+//
74
+//func makeMsgList(dataLen int, isAlpha bool) []MsgUnit{
75
+//	rand.Seed(Time.Now().UnixNano())
76
+//	var ret []MsgUnit
77
+//	for i := 0 ; i < dataLen ; i++{
78
+//		ret = append(ret, makeData(isAlpha))
79
+//	}
80
+//	return ret
81
+//}
82
+//
83
+//func checkOperatorList(isSingle bool, sub int, operator string, l *valueNode) int{
84
+//	ret := -1
85
+//	if isSingle{
86
+//		switch operator {
87
+//		case "<":
88
+//			ret = findSub(l.single2sub_s, sub)
89
+//		case "<=":
90
+//			ret = findSub(l.single2sub_es, sub)
91
+//		case ">":
92
+//			ret = findSub(l.single2sub_b, sub)
93
+//		case ">=":
94
+//			ret = findSub(l.single2sub_eb, sub)
95
+//		case "==":
96
+//			ret = findSub(l.single2sub_e, sub)
97
+//		}
98
+//
99
+//	} else{
100
+//		switch operator {
101
+//		case "<":
102
+//			ret = findSub(l.range2sub_s, sub)
103
+//		case "<=":
104
+//			ret = findSub(l.range2sub_es, sub)
105
+//		case ">":
106
+//			ret = findSub(l.range2sub_b, sub)
107
+//		case ">=":
108
+//			ret = findSub(l.range2sub_eb, sub)
109
+//		}
110
+//	}
111
+//	return ret
112
+//}
113
+//
114
+//func watchData(msgList []MsgUnit, dataLen int){
115
+//	for i := 0; i < dataLen; i++{
116
+//		msg := msgList[i]
117
+//		fmt.Println(
118
+//			"\nFrom = ", msg.(*SubscriptionMsg).Message.From,
119
+//			"\nTime = ", msg.(*SubscriptionMsg).Message.Time,
120
+//			"\nTopic = ", msg.(*SubscriptionMsg).Topic,
121
+//			"\nValue = ", msg.(*SubscriptionMsg).Value,
122
+//			"\noperator = ", msg.(*SubscriptionMsg).operator,
123
+//			"\nisAlpha ?= ", msg.(*SubscriptionMsg).isAlpha,
124
+//		)
125
+//	}
126
+//}
127
+//
128
+////Test addSubScription(1) (dif all [Topic, Value, operator])
129
+//func Test_addSubscription_allDif(t *testing.T) {
130
+//	rand.Seed(Time.Now().UnixNano())
131
+//
132
+//	// To Init sub_mng
133
+//	s := sub_manager{
134
+//		ip2sub:     make(map[string][]int),
135
+//		sub2node:   make(map[int][]*valueNode),
136
+//		israngesub: make(map[int]bool),
137
+//	}
138
+//	mos := Moscato{sub_mng: s}
139
+//
140
+//	// Make Data set(Subscription)
141
+//	var msgList []MsgUnit
142
+//	dataLen := 100
143
+//	msgList = makeMsgList(dataLen, false)
144
+//
145
+//	//Watch Data set
146
+//	//watchData(msgList, dataLen)
147
+//
148
+//	for i := 0; i < dataLen; i++ {
149
+//		msg := msgList[i]
150
+//		ip := msg.(*SubscriptionMsg).Message.From
151
+//		Topic := msg.(*SubscriptionMsg).Topic
152
+//		Value := msg.(*SubscriptionMsg).Value
153
+//		operator := msg.(*SubscriptionMsg).operator
154
+//		subnumber := mos.sub_mng.count_sub
155
+//		isSingle := true
156
+//
157
+//		// 0. Check addSubscription
158
+//		err := mos.sub_mng.addSubscription(msg)
159
+//		assert.Equal(t, nil, err)
160
+//
161
+//		// 1. Check if ip mapping is correct
162
+//		assert.Equal(t, subnumber, mos.sub_mng.ip2sub[ip][len(mos.sub_mng.ip2sub[ip])-1], "Ip mapping is failed")
163
+//
164
+//		// 2. Check topicNode
165
+//		topicPtr := mos.sub_mng.list.head
166
+//		for topicPtr != nil {
167
+//			if Compare(Topic, topicPtr.Topic) == 0 {
168
+//				break
169
+//			}
170
+//			topicPtr = topicPtr.next
171
+//		}
172
+//
173
+//		assert.Equal(t, Topic, topicPtr.Topic, "topicNode Add is failed")
174
+//
175
+//		// Check isSingle ?
176
+//		if len(operator) == 3 && operator[1] == "&&" {
177
+//			isSingle = false
178
+//		}
179
+//
180
+//		// 3. Check Value in ValueNode & Check Operator in ValueNode
181
+//		if !isSingle || (len(operator) == 3 && operator[1] == "||") {
182
+//			valptr1 := topicPtr.list.getValueNodePos([]int64{Value[0]})
183
+//			valptr2 := topicPtr.list.getValueNodePos([]int64{Value[1]})
184
+//
185
+//			assert.Equal(t, []int64{Value[0]}, valptr1.val)
186
+//			assert.Equal(t, []int64{Value[1]}, valptr2.val)
187
+//
188
+//			assert.NotEqual(t, -1, checkOperatorList(isSingle, subnumber, operator[0], valptr1))
189
+//			assert.NotEqual(t, -1, checkOperatorList(isSingle, subnumber, operator[2], valptr2))
190
+//
191
+//		} else {
192
+//			valptr := topicPtr.list.getValueNodePos(Value)
193
+//			assert.Equal(t, Value, valptr.val)
194
+//			assert.NotEqual(t, -1, checkOperatorList(isSingle, subnumber, operator[0], valptr))
195
+//		}
196
+//	}
197
+//}
198
+//
199
+//// Test addSubScription(2) (same [Topic, Value] dif [operator])
200
+//func Test_addSubscription_same_topicNvalue(t *testing.T) {
201
+//	rand.Seed(Time.Now().UnixNano())
202
+//
203
+//	// To Init sub_mng
204
+//	s := sub_manager{
205
+//		ip2sub:     make(map[string][]int),
206
+//		sub2node:   make(map[int][]*valueNode),
207
+//		israngesub: make(map[int]bool),
208
+//	}
209
+//
210
+//	mos := Moscato{sub_mng: s}
211
+//
212
+//	// Fix Topic & Value
213
+//	topicLen := rand.Int()%10 + 1
214
+//	staticTopic := make([]int64, topicLen)
215
+//	staticValue := []int64{rand.Int63()}
216
+//
217
+//
218
+//	// Make Data
219
+//	var msgList []MsgUnit
220
+//	dataLen := 100
221
+//	msgList = makeMsgList(dataLen, false)
222
+//
223
+//	for i := 0; i < topicLen; i++ {
224
+//		staticTopic[i] = rand.Int63()
225
+//	}
226
+//
227
+//	// Fix Same Topic & Value
228
+//	for i := 0; i < dataLen; i++ {
229
+//		operator := msgList[i].(*SubscriptionMsg).operator
230
+//		msgList[i].(*SubscriptionMsg).Topic = staticTopic
231
+//
232
+//		if len(operator) == 1{
233
+//			msgList[i].(*SubscriptionMsg).Value = staticValue
234
+//		}
235
+//	}
236
+//
237
+//	// Watch Data set
238
+//	// watchData(msgList, dataLen)
239
+//
240
+//	for i := 0; i < dataLen; i++ {
241
+//		msg := msgList[i]
242
+//		err := mos.sub_mng.addSubscription(msg)
243
+//		assert.Equal(t, nil, err)
244
+//	}
245
+//
246
+//	topicPtr := mos.sub_mng.list.head
247
+//	for topicPtr != nil {
248
+//		if Compare(topicPtr.Topic, staticTopic) == 0 {
249
+//			break
250
+//		}
251
+//		topicPtr = topicPtr.next
252
+//	}
253
+//	watchValueNode(topicPtr.list.head)
254
+//
255
+//}
256
+//
257
+//func watchValueNode(ptr *valueNode) {
258
+//	valPtr := ptr
259
+//	for valPtr != nil{
260
+//		fmt.Println("Value = ", valPtr.val)
261
+//		if len(valPtr.single2sub_s) != 0 {
262
+//			fmt.Println("Single2sub (<) List = ", valPtr.single2sub_s)
263
+//		}
264
+//		if len(valPtr.single2sub_es) != 0 {
265
+//			fmt.Println("Single2sub (<=) List = ", valPtr.single2sub_es)
266
+//		}
267
+//		if len(valPtr.single2sub_b) != 0 {
268
+//			fmt.Println("Single2sub (>) List = ", valPtr.single2sub_b)
269
+//		}
270
+//		if len(valPtr.single2sub_eb) != 0 {
271
+//			fmt.Println("Single2sub (>=) List = ", valPtr.single2sub_eb)
272
+//		}
273
+//		if len(valPtr.single2sub_e) != 0 {
274
+//			fmt.Println("Single2sub (==) List = ", valPtr.single2sub_e)
275
+//		}
276
+//
277
+//		if len(valPtr.range2sub_s) != 0 {
278
+//			fmt.Println("range2sub (<) List = ", valPtr.range2sub_s)
279
+//		}
280
+//		if len(valPtr.range2sub_es) != 0 {
281
+//			fmt.Println("range2sub (<=) List = ", valPtr.range2sub_es)
282
+//		}
283
+//		if len(valPtr.range2sub_b) != 0 {
284
+//			fmt.Println("range2sub (>) List = ", valPtr.range2sub_b)
285
+//		}
286
+//		if len(valPtr.range2sub_eb) != 0 {
287
+//			fmt.Println("range2sub (>=) List = ", valPtr.range2sub_eb)
288
+//		}
289
+//		valPtr = valPtr.next
290
+//	}
291
+//}

Loading…
İptal
Kaydet