Project Moscato Team Messaging Middleware Implemetation Message Middleware by Golang Operate as Secure, Effectively
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

init.go 5.1KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. package modules
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "github.com/facebookgo/inject"
  7. "log"
  8. "net"
  9. "net/rpc"
  10. )
  11. // TODO: RM 처리와 init.go 전반적으로 DI 이용하여 구현
  12. //temporary type for matching manager
  13. //type match_manager struct{}
  14. func (match_mng *match_manager) matching(queue *MsgQueue) {
  15. //msg := queue.pop(true)
  16. //Implement here ~~
  17. }
  18. func (match_mng *match_manager) add_subscription(msg MsgUnit) {
  19. }
  20. //temporary type for secure(key) manager
  21. type secure_manager struct{}
  22. type Moscato struct {
  23. queue MsgQueue
  24. MicroServiceManager NodeManager `inject:""`
  25. match_mng match_manager
  26. secure_mng secure_manager
  27. }
  28. type Reply struct {
  29. CompleteLog string
  30. }
  31. type Receiver struct {
  32. moscato *Moscato
  33. }
  34. type Args struct { // 매개변수
  35. JsonMsg []byte
  36. Kind int
  37. }
  38. func (receiver Receiver) MmReceive(args Args, reply *Reply) error {
  39. // 메세지 별로 나눠서 언마샬하면 됨
  40. switch args.Kind {
  41. case KSM:
  42. var msg KeyShareMsg
  43. err := json.Unmarshal(args.JsonMsg, &msg)
  44. if err != nil {
  45. return err
  46. }
  47. go func() {
  48. _, err := receiver.moscato.Receive(msg)
  49. if err != nil {
  50. }
  51. }()
  52. reply.CompleteLog = "received completely"
  53. case PM:
  54. var msg PublishMsg
  55. err := json.Unmarshal(args.JsonMsg, &msg)
  56. if err != nil {
  57. return err
  58. }
  59. go func() {
  60. _, err := receiver.moscato.Receive(msg)
  61. if err != nil {
  62. }
  63. }()
  64. reply.CompleteLog = "PM received completely"
  65. case SM:
  66. var msg SubscriptionMsg
  67. err := json.Unmarshal(args.JsonMsg, &msg)
  68. if err != nil {
  69. return err
  70. }
  71. go func() {
  72. _, err := receiver.moscato.Receive(msg)
  73. if err != nil {
  74. }
  75. }()
  76. reply.CompleteLog = "received completely"
  77. case RM:
  78. var msg RegisterMsg
  79. err := json.Unmarshal(args.JsonMsg, &msg)
  80. if err != nil {
  81. return err
  82. }
  83. go func() {
  84. _, err := receiver.moscato.Receive(msg)
  85. if err != nil {
  86. fmt.Println(err)
  87. }
  88. }()
  89. reply.CompleteLog = "RM received completely"
  90. case WM:
  91. var msg WithdrawMsg
  92. err := json.Unmarshal(args.JsonMsg, &msg)
  93. if err != nil {
  94. return err
  95. }
  96. go func() {
  97. _, err := receiver.moscato.Receive(msg)
  98. if err != nil {
  99. }
  100. }()
  101. reply.CompleteLog = "received completely"
  102. default:
  103. return errors.New("message type Error: Not registered message type")
  104. }
  105. //reply.CompleteLog = "received completely"
  106. return nil
  107. }
  108. func (moscato *Moscato) CheckQueue() MsgUnit {
  109. for {
  110. pmMsg := moscato.queue.pop(true)
  111. fmt.Println("queue popped : ", pmMsg)
  112. }
  113. }
  114. //Recieve - MM가 msg전달 받음
  115. func (moscato *Moscato) Receive(msg MsgUnit) (MsgUnit, error) {
  116. //rpc call
  117. var msg_type = msg.CheckType()
  118. //메세지 타입에 따라 다르게 처리
  119. switch msg_type {
  120. case KSM: //Key share msg
  121. case PM: //Publish msg
  122. moscato.queue.push(msg.(PublishMsg))
  123. log.Println("PM received")
  124. case SM: //Subscription msg
  125. moscato.match_mng.add_subscription(msg.(*SubscriptionMsg))
  126. case RM: //Register msg
  127. var newMsg RegisterMsg
  128. newMsg = msg.(RegisterMsg)
  129. newNode := MSNode{newMsg.From, newMsg.From}
  130. moscato.MicroServiceManager.AddMicroservice(newNode)
  131. addr, _ := moscato.MicroServiceManager.GetIpaddr(newMsg.From)
  132. fmt.Println(addr)
  133. log.Println("RM received")
  134. case WM: //Withdraw msg
  135. moscato.MicroServiceManager.RemoveMicroservice(msg.(*WithdrawMsg).From)
  136. default:
  137. return nil, errors.New("Message type Error: Not registered message type")
  138. }
  139. return msg, nil
  140. }
  141. //Ms로 보낼때 쓸 함수
  142. func (moscato *Moscato) Send2MS(ipaddress string, msg MsgUnit) {
  143. client, err := rpc.Dial("tcp", ipaddress+":8150")
  144. if err != nil {
  145. fmt.Println(err)
  146. return
  147. }
  148. defer client.Close()
  149. reply := new(Reply)
  150. jmsg, _ := msg.ConvertToJson()
  151. args := Args{
  152. JsonMsg: jmsg,
  153. Kind: msg.CheckType(),
  154. }
  155. err = client.Call("receiver.MsReceive", args, reply)
  156. if err != nil {
  157. fmt.Println(err)
  158. return
  159. }
  160. fmt.Println(reply.CompleteLog) //잘 받았는지 확인 해줌
  161. }
  162. func (moscato *Moscato) Run() {
  163. var graph inject.Graph
  164. err := graph.Provide(
  165. &inject.Object{Value: NewMStable()},
  166. &inject.Object{Value: moscato})
  167. if err != nil {
  168. fmt.Println(err)
  169. return
  170. }
  171. err = graph.Populate()
  172. if err != nil {
  173. fmt.Println(err)
  174. return
  175. }
  176. //모스카토 구조체 변수 초기화
  177. receiver := Receiver{moscato: moscato}
  178. err = moscato.queue.queue_init()
  179. if err != nil {
  180. fmt.Println(err)
  181. return
  182. }
  183. //moscato.MicroServiceManager = NewMStable()
  184. //go routine -> matching 동작
  185. go moscato.match_mng.matching(&moscato.queue)
  186. go moscato.CheckQueue()
  187. //rpc 등록 -> Receive 함수
  188. err = rpc.Register(receiver)
  189. if err != nil {
  190. log.Println(err)
  191. return
  192. }
  193. Listen()
  194. log.Println("listen complete.")
  195. fmt.Scanln()
  196. }
  197. func Listen() {
  198. //l, err := net.Listen("tcp", fmt.Sprintf(":%v", 8160))
  199. l, err1 := net.Listen("tcp", ":8160") //MS로 부터 받는거
  200. //l2, err2 := net.Listen("tcp","0.0.0.0:8150")
  201. if err1 != nil {
  202. log.Fatal(fmt.Sprintf("Unable to listen on given port: %s", err1))
  203. }
  204. /*defer l1.Close()
  205. if err2 != nil {
  206. log.Fatal(fmt.Sprintf("Unable to listen on given port: %s", err2))
  207. }
  208. defer l2.Close()*/
  209. for {
  210. conn, _ := l.Accept()
  211. go rpc.ServeConn(conn)
  212. }
  213. /*for {
  214. conn, _ := l2.Accept()
  215. go rpc.ServeConn(conn)
  216. }*/
  217. }