Project Moscato Team Messaging Middleware Implemetation Message Middleware by Golang Operate as Secure, Effectively
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package modules
  2. import (
  3. "errors"
  4. "fmt"
  5. "log"
  6. "net"
  7. "net/rpc"
  8. )
  9. //temporary type for matching manager
  10. type match_manager struct{}
  11. func (match_mng *match_manager) matching(queue *MsgQueue) {
  12. //msg := queue.pop(true)
  13. //Implement here ~~
  14. }
  15. func (match_mng *match_manager) add_subscription(msg MsgUnit) {
  16. }
  17. //temporary type for secure(key) manager
  18. type secure_manager struct{}
  19. type Moscato struct {
  20. queue MsgQueue
  21. ms_mng MStable
  22. match_mng match_manager
  23. secure_mng secure_manager
  24. }
  25. // Send - rpc를 이용하여 msg전송
  26. func (moscato *Moscato)Send(ipaddr string, message MsgUnit, reply []byte)([]byte, error) {
  27. reply,err:=message.ConvertToJson()
  28. return reply,err
  29. }
  30. // Recieve - rpc를 이용하여 msg전달 받음(rpc call)
  31. func (moscato *Moscato) Recieve(msg MsgUnit) (MsgUnit, error) {
  32. msg_type := msg.CheckType()
  33. //메세지 타입에 따라 다르게 처리
  34. switch msg_type {
  35. case KSM: //Key share msg
  36. case PM: //Publish msg
  37. moscato.queue.push(msg.(*PublishMsg))
  38. case SM: //Subscription msg
  39. moscato.match_mng.add_subscription(msg.(*SubscriptionMsg))
  40. case RM: //Register msg
  41. //var newmsg RegisterMsg
  42. var newmsg = msg.(*RegisterMsg)
  43. newNode := MSnode{newmsg.from, newmsg.from}
  44. moscato.ms_mng.add_microservice(newNode)
  45. case WM: //Withdraw msg
  46. moscato.ms_mng.remove_microservice(msg.(*WithdrawMsg).from)
  47. default:
  48. return nil, errors.New("Message type Error: Not registered message type")
  49. }
  50. return msg, nil
  51. }
  52. func (moscato *Moscato) Run() {
  53. //모스카토 구조체 변수 초기화
  54. moscato.queue.queue_init()
  55. //go routine -> matching 동작
  56. go moscato.match_mng.matching(&moscato.queue)
  57. //rpc 등록 -> Receive함수
  58. rpc.Register(moscato)
  59. moscato.Listen()
  60. }
  61. func (moscato *Moscato) Listen() {
  62. l, err := net.Listen("tcp", fmt.Sprintf(":%v", 8160))
  63. if err != nil {
  64. log.Fatal(fmt.Sprintf("Unable to listen on given port: %s", err))
  65. }
  66. defer l.Close()
  67. for {
  68. conn, _ := l.Accept()
  69. go rpc.ServeConn(conn)
  70. }
  71. }