Bläddra i källkod

[fin]init.go rpc & reassign MM_Receive

master
jaehoon_kim 4 år sedan
förälder
incheckning
b89cd49710
1 ändrade filer med 79 tillägg och 14 borttagningar
  1. 79
    14
      src/broker/modules/init.go

+ 79
- 14
src/broker/modules/init.go Visa fil

@@ -23,6 +23,10 @@ func (match_mng *match_manager) add_subscription(msg MsgUnit) {
23 23
 //temporary type for secure(key) manager
24 24
 type secure_manager struct{}
25 25
 
26
+type Microservice struct {
27
+
28
+}
29
+
26 30
 type Moscato struct {
27 31
 	queue      MsgQueue
28 32
 	ms_mng     MStable
@@ -34,9 +38,6 @@ type Reply struct {
34 38
 	ReceiveMsg MsgUnit
35 39
 }
36 40
 
37
-type Listner int
38
-
39
-
40 41
 //MS에서 보내는거-rpc call 사용
41 42
 /*func (l *Listner)MM_Receive(msg MsgUnit, reply *Reply)error{
42 43
 	reply.ReceiveMsg=msg
@@ -44,41 +45,68 @@ type Listner int
44 45
 	return nil
45 46
 }*/
46 47
 
47
-func (moscato *Moscato)MM_Receive(msg MsgUnit, reply *Reply)error{
48
-	reply.ReceiveMsg=msg
49
-  	go moscato.Receive(reply)
48
+func (moscato *Moscato)MM_Receive(msg MsgUnit)error{
49
+  	go moscato.Receive(msg)
50 50
 	return nil
51 51
 }
52 52
 
53
+
53 54
 //Recieve - MM가 msg전달 받음
54
-func (moscato *Moscato)Receive(reply *Reply) (MsgUnit, error) {
55
+func (moscato *Moscato)Receive(msg MsgUnit) (MsgUnit, error) {
55 56
 	//rpc call
56
-	var msg_type = reply.ReceiveMsg.CheckType()
57
+	var msg_type = msg.CheckType()
57 58
 	//메세지 타입에 따라 다르게 처리
58 59
 	switch msg_type {
59 60
 
60 61
 	case KSM: //Key share msg
61 62
 
62 63
 	case PM: //Publish msg
63
-		moscato.queue.push(reply.ReceiveMsg.(*PublishMsg))
64
+		moscato.queue.push(msg.(*PublishMsg))
64 65
 
65 66
 	case SM: //Subscription msg
66
-		moscato.match_mng.add_subscription(reply.ReceiveMsg.(*SubscriptionMsg))
67
+		moscato.match_mng.add_subscription(msg.(*SubscriptionMsg))
67 68
 
68 69
 	case RM: //Register msg
69 70
 		//var newmsg RegisterMsg
70
-		var newmsg = reply.ReceiveMsg.(*RegisterMsg)
71
+		var newmsg = msg.(*RegisterMsg)
71 72
 		newNode := MSnode{newmsg.From(), newmsg.From()}
72 73
 		moscato.ms_mng.add_microservice(newNode)
73 74
 
74 75
 	case WM: //Withdraw msg
75
-		moscato.ms_mng.remove_microservice(reply.ReceiveMsg.(*WithdrawMsg).From())
76
+		moscato.ms_mng.remove_microservice(msg.(*WithdrawMsg).From())
76 77
 
77 78
 	default:
78 79
 		return nil, errors.New("Message type Error: Not registered message type")
79 80
 	}
80 81
 
81
-	return reply.ReceiveMsg, nil
82
+	return msg, nil
83
+}
84
+
85
+func handleSum(ln net.Listener){
86
+	for {
87
+		conn, err := ln.Accept() // 클라이언트가 연결되면 TCP 연결을 리턴
88
+		if err != nil {
89
+			continue
90
+		}
91
+
92
+		defer func(conn net.Conn) {
93
+			err := conn.Close()
94
+			if err != nil {
95
+
96
+			}
97
+		}(conn)             // main 함수가 끝나기 직전에 TCP 연결을 닫음
98
+
99
+		go rpc.ServeConn(conn) // RPC를 처리하는 함수를 고루틴으로 실행
100
+	}
101
+}
102
+
103
+func (moscato *Moscato)callRPC(client *rpc.Client, msg MsgUnit){
104
+	reply:=new(Reply)
105
+	err := client.Call("MicroService.MS_Receive", msg,reply) // Calc.Sum 함수 호출
106
+	if err != nil {
107
+		fmt.Println(err)
108
+		return
109
+	}
82 110
 }
83 111
 
84 112
 
@@ -90,8 +118,45 @@ func (moscato *Moscato) Run() {
90 118
 	go moscato.match_mng.matching(&moscato.queue)
91 119
 
92 120
 	//rpc 등록 -> Receive함수
93
-	rpc.Register(moscato)
94 121
 	moscato.Listen()
122
+
123
+	err := rpc.Register(new(Microservice))
124
+	if err != nil {
125
+		return
126
+	} // Calc 타입의 인스턴스를 생성하여 RPC 서버에 등록
127
+	ln, err := net.Listen("tcp", ":6000") // TCP 프로토콜에 6000번 포트로 연결을 받음 local 나중에 8150
128
+	ln1, err1 := net.Listen("tcp", ":6001") // TCP 프로토콜에 6000번 포트로 연결을 받음 나중에 8160
129
+
130
+	if err != nil {
131
+		fmt.Println(err)
132
+		return
133
+	}
134
+	if err1 != nil {
135
+		fmt.Println(err)
136
+		return
137
+	}
138
+	defer func(ln net.Listener) {
139
+		err := ln.Close()
140
+		if err != nil {
141
+
142
+		}
143
+	}(ln)             // main 함수가 종료되기 직전에 연결 대기를 닫음
144
+	defer func(ln1 net.Listener) {
145
+		err := ln1.Close()
146
+		if err != nil {
147
+
148
+		}
149
+	}(ln1) // main 함수가 종료되기 직전에 연결 대기를 닫음
150
+
151
+	/*client1, err := rpc.Dial("tcp", "127.0.0.1:6000") // RPC 서버에 연결
152
+	if err != nil {
153
+		fmt.Println(err)
154
+		return
155
+	}*/
156
+
157
+	go handleSum(ln)
158
+	go handleSum(ln1)
159
+	fmt.Scanln() // main 함수가 종료되지 않도록 대기
95 160
 }
96 161
 
97 162
 func (moscato *Moscato) Listen() {

Laddar…
Avbryt
Spara