Project Moscato Team Messaging Middleware Implemetation Message Middleware by Golang Operate as Secure, Effectively
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. package modules
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "github.com/fatih/color"
  7. l4g "github.com/jeanphorn/log4go"
  8. "net"
  9. "net/rpc"
  10. )
  11. // TODO: RM 처리와 init.go 전반적으로 DI 이용하여 구현 RM 키등록 부분 구현 예정
  12. type Moscato struct {
  13. queue MsgQueue
  14. SendQueue chan myType
  15. MicroServiceManager NodeManager `inject:""`
  16. MatchingManager match_manager
  17. SubscriptionManager sub_manager
  18. SecureManager SecurityManager `inject:""`
  19. }
  20. type myType struct {
  21. subList []string
  22. pubMsg MsgUnit
  23. err error
  24. }
  25. type Reply struct { //RPC리턴값
  26. CompleteLog string //제대로 받았는지 확인하는 로그
  27. }
  28. type Receiver struct { //RPC 서버에 등록하기 위한 변수
  29. moscato *Moscato
  30. }
  31. type Args struct { // 매개변수
  32. JsonMsg []byte
  33. Kind int
  34. }
  35. /*
  36. MS→MM
  37. -MM 실행되면 MM서버는 열려있음(MS은 자동으로 Client)(포트 8150)
  38. -Send2MM 호출 → rpc call로 MmReceive호출해서 MM로 전달(json형식)
  39. -MmReceive에서 msgType 검사 후 그것에 맞게 msgUnit으로 MM의 Receive로 보냄
  40. -MM의 Receive에서 해당 Message를 처리
  41. */
  42. func (receiver Receiver) MmReceive(args Args, reply *Reply) error { //
  43. // 메세지 별로 나눠서 언마샬
  44. switch args.Kind {
  45. case KSM:
  46. var msg KeyShareMsg
  47. err := json.Unmarshal(args.JsonMsg, &msg)
  48. if err != nil {
  49. return err
  50. }
  51. go func() {
  52. _, err := receiver.moscato.Receive(msg)
  53. if err != nil {
  54. }
  55. }()
  56. reply.CompleteLog = "received"
  57. case PM:
  58. var msg PublishMsg
  59. err := json.Unmarshal(args.JsonMsg, &msg)
  60. if err != nil {
  61. return err
  62. }
  63. go func() {
  64. _, err := receiver.moscato.Receive(msg)
  65. if err != nil {
  66. }
  67. }()
  68. reply.CompleteLog = "PM received"
  69. case SM:
  70. var msg SubscriptionMsg
  71. err := json.Unmarshal(args.JsonMsg, &msg)
  72. if err != nil {
  73. return err
  74. }
  75. go func() {
  76. _, err := receiver.moscato.Receive(msg)
  77. if err != nil {
  78. }
  79. }()
  80. reply.CompleteLog = "SM received"
  81. case RM:
  82. var msg RegisterMsg
  83. err := json.Unmarshal(args.JsonMsg, &msg)
  84. if err != nil {
  85. return err
  86. }
  87. go func() {
  88. _, err := receiver.moscato.Receive(msg)
  89. if err != nil {
  90. fmt.Println(err)
  91. }
  92. }()
  93. reply.CompleteLog = "RM received"
  94. case WM:
  95. var msg WithdrawMsg
  96. err := json.Unmarshal(args.JsonMsg, &msg)
  97. if err != nil {
  98. return err
  99. }
  100. go func() {
  101. _, err := receiver.moscato.Receive(msg)
  102. if err != nil {
  103. }
  104. }()
  105. reply.CompleteLog = "WM received"
  106. default:
  107. return errors.New("message type Error: Not registered message type")
  108. }
  109. //reply.CompleteLog = "received completely"
  110. return nil
  111. }
  112. //Recieve - MM가 MS로부터 메세지 전달받음
  113. func (moscato *Moscato) Receive(msg MsgUnit) (MsgUnit, error) {
  114. l4g.LoadConfiguration("modules/logConfig.json")
  115. //rpc call
  116. var msg_type = msg.CheckType()
  117. //메세지 타입에 따라 다르게 처리
  118. switch msg_type {
  119. case KSM: //Key share msg
  120. case PM: //Publish msg
  121. //log.Println("PM received")
  122. l4g.LOGGER("Test").Info("PM received")
  123. moscato.queue.push(moscato.preProcessMsg(msg))
  124. case SM: //Subscription msg
  125. //log.Println("SM received")
  126. l4g.LOGGER("Test").Info("SM received")
  127. err := moscato.SubscriptionManager.addSubscription(moscato.preProcessMsg(msg))
  128. if err != nil {
  129. println(err)
  130. //return nil, err
  131. }
  132. case RM: //Register msg
  133. //log.Println("RM received")
  134. l4g.LOGGER("Test").Info("RM received")
  135. l4g.LOGGER("Test").Debug("RM received")
  136. var newMsg RegisterMsg
  137. newMsg = msg.(RegisterMsg)
  138. newNode := MSNode{newMsg.From, newMsg.From}
  139. resultAddNode := moscato.MicroServiceManager.AddMicroservice(newNode)
  140. if resultAddNode {
  141. //log.Println("Node added successful")
  142. l4g.LOGGER("Test").Info("Node added successful")
  143. } else {
  144. l4g.LOGGER("Test").Error("Node is already added, ignore RM")
  145. //log.Println("Node is already added, ignore RM")
  146. return msg, nil
  147. }
  148. addr, _ := moscato.MicroServiceManager.GetIpaddr(newMsg.From)
  149. moscato.SecureManager.RegKey(newMsg)
  150. fmt.Println("Registered microservice: address", addr,
  151. "/ key", moscato.SecureManager.GetNodeKey(newMsg.From))
  152. // ackRM 메세지 전송
  153. go moscato.Send2MS(addr, newMsg)
  154. case WM: //Withdraw msg
  155. //ip := msg.(WithdrawMsg).From
  156. //sublist := moscato.SubscriptionManager.ip2sub[ip]
  157. //fmt.Println("prev list = ", sublist)
  158. moscato.MicroServiceManager.RemoveMicroservice(msg.(WithdrawMsg).From)
  159. moscato.SecureManager.RemoveSecureKey(msg.(WithdrawMsg).From)
  160. //moscato.SubscriptionManager.delete(ip)
  161. //sublist2 := moscato.SubscriptionManager.ip2sub[ip]
  162. //fmt.Println("after list =", sublist2)
  163. default:
  164. return nil, errors.New("Message type Error: Not registered message type")
  165. }
  166. return msg, nil
  167. }
  168. //MS로 보낼때 쓸 함수
  169. /*
  170. MM→MS
  171. -MS 실행되면 MS서버는 열려있음(MM은 자동으로 Client)(포트 8160)
  172. -Send2MS 호출 → rpc call로 MsReceive호출해서 MS로 전달(json형식)
  173. -MsReceive에서 msgType 검사 후 그것에 맞게 msgUnit으로 MS의 Receive로 보냄
  174. -MS의 Receive에서 해당 Message를 처리
  175. */
  176. func (moscato *Moscato) Send2MS(ipaddress string, msg MsgUnit) {
  177. l4g.LoadConfiguration("logConfig.json")
  178. client, err := rpc.Dial("tcp", ipaddress+":8150")
  179. if err != nil {
  180. fmt.Println(err)
  181. return
  182. }
  183. defer client.Close()
  184. reply := new(Reply)
  185. jmsg, _ := msg.ConvertToJson()
  186. args := Args{
  187. JsonMsg: jmsg,
  188. Kind: msg.CheckType(),
  189. }
  190. err = client.Call("Receiver.Receive", args, reply)
  191. if err != nil {
  192. fmt.Println(err)
  193. return
  194. }
  195. //log.Println(reply.CompleteLog) //잘 받았는지 확인 해줌
  196. // 마이크로 서비스에게 받은 메시지는 노란색으로 출력
  197. //log.Println(reply.CompleteLog)
  198. l4g.LOGGER("Test").Info(reply.CompleteLog)
  199. }
  200. //Matching을 용이하게 하기위한 메세지 가공 과정
  201. func (moscato *Moscato) preProcessMsg(originalMsg MsgUnit) MsgUnit {
  202. if originalMsg.CheckType() == PM {
  203. pubMsg := originalMsg.(PublishMsg)
  204. for index := 0; index < len(pubMsg.Topic); index++ {
  205. pubMsg.Topic[index] = pubMsg.Topic[index] - moscato.SecureManager.GetNodeKey(pubMsg.From)
  206. }
  207. for index := 0; index < len(pubMsg.Value); index++ {
  208. pubMsg.Value[index] = pubMsg.Value[index] - moscato.SecureManager.GetNodeKey(pubMsg.From)
  209. }
  210. return pubMsg
  211. } else if originalMsg.CheckType() == SM {
  212. subMsg := originalMsg.(SubscriptionMsg)
  213. for index := 0; index < len(subMsg.Topic); index++ {
  214. subMsg.Topic[index] = subMsg.Topic[index] - moscato.SecureManager.GetNodeKey(subMsg.From)
  215. }
  216. for index := 0; index < len(subMsg.Value); index++ {
  217. subMsg.Value[index] = subMsg.Value[index] - moscato.SecureManager.GetNodeKey(subMsg.From)
  218. }
  219. return subMsg
  220. }
  221. return nil
  222. }
  223. //암호화 해서 보내기
  224. func (moscato *Moscato) SendWithEncrypt() MsgUnit {
  225. for {
  226. mt := <-moscato.SendQueue
  227. if mt.err == nil {
  228. for index := 0; index < len(mt.subList); index++ { //sublist들을 돌면서 매세지를 encrypt하여 메세지 보냄
  229. tmpNode := mt.subList[index]
  230. tmpNodeIpAddr, _ := moscato.MicroServiceManager.GetIpaddr(tmpNode)
  231. //moscato.SecureManager.ReEncPubMsg(mt.pubMsg.(PublishMsg), tmpNode)
  232. //fmt.Println("publish: ", mt.pubMsg)
  233. //moscato.Send2MS(tmpNodeIpAddr, mt.pubMsg)
  234. moscato.Send2MS(tmpNodeIpAddr, moscato.SecureManager.ReEncPubMsg(mt.pubMsg.(PublishMsg), tmpNode))
  235. }
  236. }
  237. return nil
  238. }
  239. }
  240. func (moscato *Moscato) Run() {
  241. config := AppConfig{moscato}
  242. config.config()
  243. //모스카토 구조체 변수 초기화
  244. receiver := Receiver{moscato: moscato}
  245. err := moscato.queue.queue_init()
  246. moscato.SendQueue = make(chan myType)
  247. moscato.SubscriptionManager.Initialize()
  248. if err != nil {
  249. fmt.Println(err)
  250. return
  251. }
  252. //go routine -> matching 동작
  253. go func() {
  254. for {
  255. msg := moscato.queue.pop(true)
  256. go moscato.Matching(msg)
  257. go moscato.SendWithEncrypt()
  258. }
  259. }()
  260. //go moscato.CheckQueue()
  261. //rpc 등록 -> Receive 함수
  262. err = rpc.Register(receiver)
  263. if err != nil {
  264. println(err)
  265. return
  266. }
  267. go Listen()
  268. color.Blue("initializing complete.")
  269. fmt.Scanln()
  270. }
  271. func Listen() {
  272. /*
  273. MS→MM일때 ⇒ port : 8160으로 열기
  274. (MM이 Server, MS가 Client)
  275. */
  276. l, err1 := net.Listen("tcp", ":8160")
  277. if err1 != nil {
  278. //log.Fatal(fmt.Sprintf("Unable to listen on given port: %s", err1))
  279. l4g.LOGGER("Test").Critical("Unable to listen on given port: %s", err1)
  280. }
  281. defer l.Close()
  282. for {
  283. conn, _ := l.Accept()
  284. go rpc.ServeConn(conn)
  285. }
  286. }