2 Révisions

Auteur SHA1 Message Date
  kimsungho 5771ffce09 Write frame at init.go il y a 4 ans
  kimsungho 6268bb73a0 Revise message.go/queue.go il y a 4 ans
5 fichiers modifiés avec 230 ajouts et 20 suppressions
  1. 3
    2
      main.go
  2. 92
    0
      modules/init.go
  3. 37
    1
      modules/manage.go
  4. 90
    9
      modules/message.go
  5. 8
    8
      modules/queue.go

+ 3
- 2
main.go Voir le fichier

@@ -1,7 +1,8 @@
1 1
 package main
2 2
 
3
-import "fmt"
3
+import "modules/Moscato_Messaging_Middleware/modules"
4 4
 
5 5
 func main() {
6
-	fmt.Printf("??")
6
+	moscato := modules.Moscato{}
7
+	moscato.Run()
7 8
 }

+ 92
- 0
modules/init.go Voir le fichier

@@ -1 +1,93 @@
1 1
 package modules
2
+
3
+import (
4
+	"errors"
5
+	"fmt"
6
+	"log"
7
+	"net"
8
+	"net/rpc"
9
+)
10
+
11
+//temporary type for matching manager
12
+type match_manager struct{}
13
+
14
+func (match_mng *match_manager) matching(queue *MsgQueue) {
15
+	//msg := queue.pop(true)
16
+	//Implement here ~~
17
+}
18
+
19
+func (match_mng *match_manager) add_subscription(msg MsgUnit) {
20
+
21
+}
22
+
23
+//temporary type for secure(key) manager
24
+type secure_manager struct{}
25
+
26
+type Moscato struct {
27
+	queue      MsgQueue
28
+	ms_mng     MStable
29
+	match_mng  match_manager
30
+	secure_mng secure_manager
31
+}
32
+
33
+// Send - rpc를 이용하여 msg전송
34
+func Send(ipaddr string, message MsgUnit) error {
35
+	return nil
36
+}
37
+
38
+// Recieve - rpc를 이용하여 msg전달 받음(rpc call)
39
+func (moscato *Moscato) Recieve(msg MsgUnit) (MsgUnit, error) {
40
+	msg_type := msg.CheckType()
41
+
42
+	//메세지 타입에 따라 다르게 처리
43
+	switch msg_type {
44
+
45
+	case KSM: //Key share msg
46
+
47
+	case PM: //Publish msg
48
+		moscato.queue.push(msg.(*PublishMsg))
49
+
50
+	case SM: //Subscription msg
51
+		moscato.match_mng.add_subscription(msg.(*SubscriptionMsg))
52
+
53
+	case RM: //Register msg
54
+		//var newmsg RegisterMsg
55
+		var newmsg = msg.(*RegisterMsg)
56
+		newNode := MSnode{newmsg.from, newmsg.from}
57
+		moscato.ms_mng.add_microservice(newNode)
58
+
59
+	case WM: //Withdraw msg
60
+		moscato.ms_mng.remove_microservice(msg.(*WithdrawMsg).from)
61
+
62
+	default:
63
+		return nil, errors.New("Message type Error: Not registered message type")
64
+	}
65
+
66
+	return msg, nil
67
+}
68
+
69
+func (moscato *Moscato) Run() {
70
+	//모스카토 구조체 변수 초기화
71
+	moscato.queue.queue_init()
72
+
73
+	//go routine -> matching 동작
74
+	go moscato.match_mng.matching(&moscato.queue)
75
+
76
+	//rpc 등록 -> Receive함수
77
+	rpc.Register(moscato)
78
+	moscato.Listen()
79
+}
80
+
81
+func (moscato *Moscato) Listen() {
82
+	l, err := net.Listen("tcp", fmt.Sprintf(":%v", 8160))
83
+
84
+	if err != nil {
85
+		log.Fatal(fmt.Sprintf("Unable to listen on given port: %s", err))
86
+	}
87
+	defer l.Close()
88
+
89
+	for {
90
+		conn, _ := l.Accept()
91
+		go rpc.ServeConn(conn)
92
+	}
93
+}

+ 37
- 1
modules/manage.go Voir le fichier

@@ -15,5 +15,41 @@ func (node *MSnode) Getipaddr() string {
15 15
 
16 16
 //모든 Microservice정보 저장
17 17
 type MStable struct {
18
-	IpTable map[string]MSnode
18
+	NodeTable map[string]MSnode
19
+}
20
+
21
+func (manager *MStable) getIpaddr(nodename string) (string, bool) {
22
+	//해당 이름의 노드이름이 존재하는지 확인
23
+	node, exists := manager.NodeTable[nodename]
24
+
25
+	//존재하지 않는 경우 nil리턴
26
+	if !exists {
27
+		return "", false
28
+	} else {
29
+		return node.ipAddr, true
30
+	}
31
+}
32
+
33
+func (manager *MStable) add_microservice(node MSnode) bool {
34
+	//삽입 전 존재여부 확인
35
+	_, exists := manager.NodeTable[node.Getname()]
36
+
37
+	if exists {
38
+		return false
39
+	} else {
40
+		manager.NodeTable[node.Getname()] = node
41
+		return true
42
+	}
43
+}
44
+
45
+func (manager *MStable) remove_microservice(nodename string) bool {
46
+	//삭제 전 존재여부 확인
47
+	_, exists := manager.NodeTable[nodename]
48
+
49
+	if exists {
50
+		delete(manager.NodeTable, nodename)
51
+		return true
52
+	} else {
53
+		return false
54
+	}
19 55
 }

+ 90
- 9
modules/message.go Voir le fichier

@@ -1,45 +1,61 @@
1 1
 package modules
2 2
 
3
+import "encoding/json"
4
+
5
+//import "strconv"
6
+//*****메세지 타입 상수화
7
+const (
8
+	KGM = 1 + iota
9
+	KSM
10
+	PM
11
+	SM
12
+	RM
13
+	WM
14
+)
15
+
3 16
 //*****메세지 틀*****
17
+
4 18
 type Message struct {
5
-	from    string
19
+	from    string //ip주소
6 20
 	version string
7 21
 	time    string
8
-	kine    string
9
-	content MsgUnit
22
+	kind    int //종류
10 23
 }
11 24
 
12 25
 type MsgUnit interface {
13 26
 	// ConvertToJson - send 전 json형식으로 바꾸는 함수
14 27
 	ConvertToJson() ([]byte, error)
15
-	// Send - rpc를 이용하여 msg전송
16
-	Send() (Message, error)
17
-	// Recieve - rpc를 이용하여 msg전달 받음(rpc call)
18
-	Recieve() (Message, error)
28
+	// CheckType - Message의 타입을 알려줌
29
+	CheckType() int
30
+	// SetType - Message 객체가 생성 되었을때 종류 정하기
31
+	//SetType(int)
19 32
 }
20 33
 
21
-//**********************
22
-
23 34
 //*****각 메세지 형식 및 정의**********
24 35
 
25 36
 //KeyGen 명령 메세지
26 37
 type KeyGenMsg struct {
27 38
 	Message
39
+	iptable []string
28 40
 }
29 41
 
30 42
 //Key공유 메세지
31 43
 type KeyShareMsg struct {
32 44
 	Message
45
+	key string
33 46
 }
34 47
 
35 48
 //전달할 내용을 담은 메세지
36 49
 type PublishMsg struct {
37 50
 	Message
51
+	subscription []int64 //선호도
52
+	content      []int64 // 내용
38 53
 }
39 54
 
40 55
 //구독 정보를 담은 메세지
41 56
 type SubscriptionMsg struct {
42 57
 	Message
58
+	subscription []int64 //선호도
43 59
 }
44 60
 
45 61
 //Microservice 등록 메세지
@@ -53,3 +69,68 @@ type WithdrawMsg struct {
53 69
 }
54 70
 
55 71
 //**************************
72
+
73
+func (msg *KeyGenMsg) ConvertToJson() ([]byte, error) {
74
+	js := msg
75
+	jsonBytes, err := json.Marshal(js)
76
+	return jsonBytes, err
77
+}
78
+
79
+func (msg *KeyShareMsg) ConvertToJson() ([]byte, error) {
80
+	js := msg
81
+	jsonBytes, err := json.Marshal(js)
82
+	return jsonBytes, err
83
+}
84
+
85
+func (msg *PublishMsg) ConvertToJson() ([]byte, error) {
86
+	js := msg
87
+	jsonBytes, err := json.Marshal(js)
88
+	return jsonBytes, err
89
+}
90
+
91
+func (msg *SubscriptionMsg) ConvertToJson() ([]byte, error) {
92
+	js := msg
93
+	jsonBytes, err := json.Marshal(js)
94
+	return jsonBytes, err
95
+}
96
+
97
+func (msg *RegisterMsg) ConvertToJson() ([]byte, error) {
98
+	js := msg
99
+	jsonBytes, err := json.Marshal(js)
100
+	return jsonBytes, err
101
+}
102
+
103
+func (msg *WithdrawMsg) ConvertToJson() ([]byte, error) {
104
+	js := msg
105
+	jsonBytes, err := json.Marshal(js)
106
+	return jsonBytes, err
107
+}
108
+
109
+func (msg Message) CheckType() int {
110
+	return msg.kind
111
+}
112
+
113
+/*
114
+func (msg Message)SetType(t int){
115
+   msg.kind=t
116
+}
117
+
118
+func NewMessage() *Message{
119
+	//m:=&Message{
120
+	//from: ip
121
+	//version: version
122
+	//time : time
123
+	//kind : kind(종류)
124
+	//}
125
+}
126
+*/
127
+
128
+//*********************************
129
+
130
+// Send - rpc를 이용하여 msg전송
131
+//func (msg *MsgUnit) Send(ipaddr string, message MsgUnit) (error){}
132
+
133
+// Recieve - rpc를 이용하여 msg전달 받음(rpc call)
134
+//func Recieve(msg *MsgUnit) (MsgUnit, error){ }
135
+
136
+//*********************************

+ 8
- 8
modules/queue.go Voir le fichier

@@ -3,7 +3,7 @@ package modules
3 3
 import "errors"
4 4
 
5 5
 type MsgQueue struct {
6
-	queue     chan Message
6
+	queue     chan MsgUnit
7 7
 	QueueFunc QueueOperate
8 8
 }
9 9
 
@@ -20,29 +20,29 @@ func (mq *MsgQueue) queue_init() error {
20 20
 	if mq.queue != nil && len(mq.queue) != 0 {
21 21
 		return errors.New("Queue Hadlerer Error: Already initialized.")
22 22
 	} else if mq.queue == nil {
23
-		mq.queue = make(chan Message)
23
+		mq.queue = make(chan MsgUnit, 1000)
24 24
 		return nil
25 25
 	} else {
26 26
 		close(mq.queue)
27
-		mq.queue = make(chan Message)
27
+		mq.queue = make(chan MsgUnit)
28 28
 		return nil
29 29
 	}
30 30
 }
31 31
 
32
-func (mq *MsgQueue) push(msg Message) bool {
32
+func (mq *MsgQueue) push(msg MsgUnit) bool {
33 33
 	mq.queue <- msg
34 34
 	return true
35 35
 }
36 36
 
37
-func (mq *MsgQueue) pop(wait bool) (Message, error) {
37
+func (mq *MsgQueue) pop(wait bool) MsgUnit {
38 38
 	if wait == true {
39 39
 		if len(mq.queue) == 0 {
40
-			return Message{}, errors.New("Queue Handler Alert: Queue is Empty")
40
+			return nil
41 41
 		} else {
42
-			return <-mq.queue, nil
42
+			return <-mq.queue
43 43
 		}
44 44
 	} else {
45 45
 		//queue에 데이터가 들어올 때까지 block
46
-		return <-mq.queue, nil
46
+		return <-mq.queue
47 47
 	}
48 48
 }

Chargement…
Annuler
Enregistrer