Project Moscato Team Messaging Middleware Implemetation Message Middleware by Golang Operate as Secure, Effectively
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. package modules
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "log"
  7. "net"
  8. "net/rpc"
  9. )
  10. //temporary type for matching manager
  11. //type match_manager struct{}
  12. func (match_mng *match_manager) matching(queue *MsgQueue) {
  13. //msg := queue.pop(true)
  14. //Implement here ~~
  15. }
  16. func (match_mng *match_manager) add_subscription(msg MsgUnit) {
  17. }
  18. //temporary type for secure(key) manager
  19. type secure_manager struct{}
  20. type Moscato struct {
  21. queue MsgQueue
  22. ms_mng MStable
  23. match_mng match_manager
  24. secure_mng secure_manager
  25. }
  26. type Reply struct {
  27. CompleteLog string
  28. }
  29. type Receiver struct {
  30. moscato *Moscato
  31. }
  32. type Args struct { // 매개변수
  33. JsonMsg []byte
  34. Kind int
  35. }
  36. func (receiver Receiver) MmReceive(args Args, reply *Reply) error {
  37. // 메세지 별로 나눠서 언마샬하면 됨
  38. switch args.Kind {
  39. case KSM:
  40. var msg KeyShareMsg
  41. err:=json.Unmarshal(args.JsonMsg,&msg)
  42. if err != nil {
  43. return err
  44. }
  45. go func() {
  46. _, err := receiver.moscato.Receive(msg)
  47. if err != nil {
  48. }
  49. }()
  50. reply.CompleteLog = "received completely"
  51. case PM:
  52. var msg PublishMsg
  53. err:=json.Unmarshal(args.JsonMsg,&msg)
  54. if err != nil {
  55. return err
  56. }
  57. go func() {
  58. _, err := receiver.moscato.Receive(msg)
  59. if err != nil {
  60. }
  61. }()
  62. reply.CompleteLog = "received completely"
  63. case SM:
  64. var msg SubscriptionMsg
  65. err:=json.Unmarshal(args.JsonMsg,&msg)
  66. if err != nil {
  67. return err
  68. }
  69. go func() {
  70. _, err := receiver.moscato.Receive(msg)
  71. if err != nil {
  72. }
  73. }()
  74. reply.CompleteLog = "received completely"
  75. case RM:
  76. var msg RegisterMsg
  77. err:=json.Unmarshal(args.JsonMsg,&msg)
  78. if err != nil {
  79. return err
  80. }
  81. go func() {
  82. _, err := receiver.moscato.Receive(msg)
  83. if err != nil {
  84. }
  85. }()
  86. reply.CompleteLog = "received completely"
  87. case WM:
  88. var msg WithdrawMsg
  89. err:=json.Unmarshal(args.JsonMsg,&msg)
  90. if err != nil {
  91. return err
  92. }
  93. go func() {
  94. _, err := receiver.moscato.Receive(msg)
  95. if err != nil {
  96. }
  97. }()
  98. reply.CompleteLog = "received completely"
  99. default:
  100. return errors.New("Message type Error: Not registered message type")
  101. }
  102. reply.CompleteLog = "received completely"
  103. return nil
  104. }
  105. //Recieve - MM가 msg전달 받음
  106. func (moscato *Moscato) Receive(msg MsgUnit) (MsgUnit, error) {
  107. //rpc call
  108. var msg_type = msg.CheckType()
  109. //메세지 타입에 따라 다르게 처리
  110. switch msg_type {
  111. case KSM: //Key share msg
  112. case PM: //Publish msg
  113. moscato.queue.push(msg.(PublishMsg))
  114. log.Println("popped queue: ", moscato.queue.pop(false))
  115. case SM: //Subscription msg
  116. moscato.match_mng.add_subscription(msg.(*SubscriptionMsg))
  117. case RM: //Register msg
  118. //var newmsg RegisterMsg
  119. var newmsg = msg.(*RegisterMsg)
  120. newNode := MSnode{newmsg.From, newmsg.From}
  121. moscato.ms_mng.add_microservice(newNode)
  122. case WM: //Withdraw msg
  123. moscato.ms_mng.remove_microservice(msg.(*WithdrawMsg).From)
  124. default:
  125. return nil, errors.New("Message type Error: Not registered message type")
  126. }
  127. return msg, nil
  128. }
  129. //Ms로 보낼때 쓸 함수
  130. func (moscato *Moscato)Send2MS(ipaddress string,msg MsgUnit){
  131. client,err:=rpc.Dial("tcp",ipaddress+":8150")
  132. if err!=nil{
  133. fmt.Println(err)
  134. return
  135. }
  136. defer client.Close()
  137. reply:=new(Reply)
  138. jmsg,_:=msg.ConvertToJson()
  139. args:=Args{
  140. JsonMsg: jmsg,
  141. Kind: msg.CheckType(),
  142. }
  143. err:=client.Call("receiver.MsReceive",args,reply)
  144. if err !=nil{
  145. fmt.Println(err)
  146. return
  147. }
  148. fmt.Println(reply.CompleteLog)//잘 받았는지 확인 해줌
  149. }
  150. func (moscato *Moscato) Run() {
  151. //모스카토 구조체 변수 초기화
  152. receiver := Receiver{moscato: moscato}
  153. moscato.queue.queue_init()
  154. //go routine -> matching 동작
  155. go moscato.match_mng.matching(&moscato.queue)
  156. //rpc 등록 -> Receive 함수
  157. err := rpc.Register(receiver)
  158. if err != nil {
  159. log.Println(err)
  160. return
  161. }
  162. Listen()
  163. log.Println("listen complete.")
  164. fmt.Scanln()
  165. }
  166. func Listen() {
  167. //l, err := net.Listen("tcp", fmt.Sprintf(":%v", 8160))
  168. l, err1 := net.Listen("tcp", ":8160")//MS로 부터 받는거
  169. //l2, err2 := net.Listen("tcp","0.0.0.0:8150")
  170. if err1 != nil {
  171. log.Fatal(fmt.Sprintf("Unable to listen on given port: %s", err1))
  172. }
  173. /*defer l1.Close()
  174. if err2 != nil {
  175. log.Fatal(fmt.Sprintf("Unable to listen on given port: %s", err2))
  176. }
  177. defer l2.Close()*/
  178. for {
  179. conn, _ := l.Accept()
  180. go rpc.ServeConn(conn)
  181. }
  182. /*for {
  183. conn, _ := l2.Accept()
  184. go rpc.ServeConn(conn)
  185. }*/
  186. }