|
|
@@ -117,11 +117,45 @@ func (receiver Receiver) MmReceive(args Args, reply *Reply) error {
|
|
117
|
117
|
return nil
|
|
118
|
118
|
}
|
|
119
|
119
|
|
|
120
|
|
-func (moscato *Moscato) CheckQueue() MsgUnit {
|
|
121
|
|
- for {
|
|
122
|
|
- pmMsg := moscato.queue.pop(true)
|
|
123
|
|
- fmt.Println("queue popped : ", pmMsg)
|
|
|
120
|
+func (moscato *Moscato) preProcessMsg(originalMsg MsgUnit) MsgUnit {
|
|
|
121
|
+ if originalMsg.CheckType() == PM {
|
|
|
122
|
+ pubMsg := originalMsg.(PublishMsg)
|
|
|
123
|
+ for index := 0; index < len(pubMsg.Topic); index++ {
|
|
|
124
|
+ pubMsg.Topic[index] = pubMsg.Topic[index] - moscato.SecureManager.GetNodeKey(pubMsg.From)
|
|
|
125
|
+ }
|
|
|
126
|
+ for index := 0; index < len(pubMsg.Value); index++ {
|
|
|
127
|
+ pubMsg.Value[index] = pubMsg.Value[index] - moscato.SecureManager.GetNodeKey(pubMsg.From)
|
|
|
128
|
+ }
|
|
|
129
|
+ fmt.Println("preprocess : ", pubMsg)
|
|
|
130
|
+ return pubMsg
|
|
|
131
|
+ } else if originalMsg.CheckType() == SM {
|
|
|
132
|
+ subMsg := originalMsg.(SubscriptionMsg)
|
|
|
133
|
+ for index := 0; index < len(subMsg.Topic); index++ {
|
|
|
134
|
+ subMsg.Topic[index] = subMsg.Topic[index] - moscato.SecureManager.GetNodeKey(subMsg.From)
|
|
|
135
|
+ }
|
|
|
136
|
+ for index := 0; index < len(subMsg.Value); index++ {
|
|
|
137
|
+ subMsg.Value[index] = subMsg.Value[index] - moscato.SecureManager.GetNodeKey(subMsg.From)
|
|
|
138
|
+ }
|
|
|
139
|
+ fmt.Println("preprocess : ", subMsg)
|
|
|
140
|
+ return subMsg
|
|
124
|
141
|
}
|
|
|
142
|
+ return nil
|
|
|
143
|
+}
|
|
|
144
|
+
|
|
|
145
|
+func (moscato *Moscato) SendWithEncrypt() MsgUnit {
|
|
|
146
|
+ mt := <-moscato.SendQueue
|
|
|
147
|
+ fmt.Println(mt)
|
|
|
148
|
+ if mt.err == nil {
|
|
|
149
|
+ for index := 0; index < len(mt.subList); index++ {
|
|
|
150
|
+ tmpNode := mt.subList[index]
|
|
|
151
|
+ tmpNodeIpAddr, _ := moscato.MicroServiceManager.GetIpaddr(tmpNode)
|
|
|
152
|
+ fmt.Println("before reEnc: ", mt.pubMsg)
|
|
|
153
|
+ moscato.SecureManager.ReEncPubMsg(mt.pubMsg.(PublishMsg), tmpNode)
|
|
|
154
|
+ fmt.Println("after reEnc: ", mt.pubMsg)
|
|
|
155
|
+ moscato.Send2MS(tmpNodeIpAddr, mt.pubMsg)
|
|
|
156
|
+ }
|
|
|
157
|
+ }
|
|
|
158
|
+ return nil
|
|
125
|
159
|
}
|
|
126
|
160
|
|
|
127
|
161
|
//Recieve - MM가 msg전달 받음
|
|
|
@@ -136,11 +170,11 @@ func (moscato *Moscato) Receive(msg MsgUnit) (MsgUnit, error) {
|
|
136
|
170
|
case PM: //Publish msg
|
|
137
|
171
|
log.Println("PM received")
|
|
138
|
172
|
fmt.Println("PM: ", msg)
|
|
139
|
|
- moscato.queue.push(msg.(PublishMsg))
|
|
|
173
|
+ moscato.queue.push(moscato.preProcessMsg(msg))
|
|
140
|
174
|
|
|
141
|
175
|
case SM: //Subscription msg
|
|
142
|
176
|
fmt.Println("SM: ", msg)
|
|
143
|
|
- moscato.SubscriptionManager.addSubscription(msg)
|
|
|
177
|
+ moscato.SubscriptionManager.addSubscription(moscato.preProcessMsg(msg))
|
|
144
|
178
|
|
|
145
|
179
|
case RM: //Register msg
|
|
146
|
180
|
log.Println("RM received")
|
|
|
@@ -217,6 +251,7 @@ func (moscato *Moscato) Run() {
|
|
217
|
251
|
|
|
218
|
252
|
//go routine -> matching 동작
|
|
219
|
253
|
go moscato.Matching()
|
|
|
254
|
+ go moscato.SendWithEncrypt()
|
|
220
|
255
|
|
|
221
|
256
|
//go moscato.CheckQueue()
|
|
222
|
257
|
|