| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301 |
- package modules
-
- import (
- "encoding/json"
- "errors"
- "fmt"
- "github.com/fatih/color"
- "log"
- "net"
- "net/rpc"
- )
-
- // TODO: RM 처리와 init.go 전반적으로 DI 이용하여 구현 RM 키등록 부분 구현 예정
-
- type Moscato struct {
- queue MsgQueue
- SendQueue chan myType
- MicroServiceManager NodeManager `inject:""`
- MatchingManager match_manager
- SubscriptionManager sub_manager
- SecureManager SecurityManager `inject:""`
- }
-
- type myType struct {
- subList []string
- pubMsg MsgUnit
- err error
- }
-
- type Reply struct {
- CompleteLog string
- }
-
- type Receiver struct {
- moscato *Moscato
- }
-
- type Args struct { // 매개변수
- JsonMsg []byte
- Kind int
- }
-
- func (receiver Receiver) MmReceive(args Args, reply *Reply) error {
- // 메세지 별로 나눠서 언마샬하면 됨
- switch args.Kind {
-
- case KSM:
- var msg KeyShareMsg
- err := json.Unmarshal(args.JsonMsg, &msg)
- if err != nil {
- return err
- }
- go func() {
- _, err := receiver.moscato.Receive(msg)
- if err != nil {
-
- }
- }()
- reply.CompleteLog = "received"
- case PM:
- var msg PublishMsg
- err := json.Unmarshal(args.JsonMsg, &msg)
- if err != nil {
- return err
- }
-
- go func() {
- _, err := receiver.moscato.Receive(msg)
- if err != nil {
-
- }
- }()
- reply.CompleteLog = "PM received"
- case SM:
- var msg SubscriptionMsg
- err := json.Unmarshal(args.JsonMsg, &msg)
- if err != nil {
- return err
- }
- go func() {
- _, err := receiver.moscato.Receive(msg)
- if err != nil {
-
- }
- }()
- reply.CompleteLog = "SM received"
- case RM:
- var msg RegisterMsg
- err := json.Unmarshal(args.JsonMsg, &msg)
- if err != nil {
- return err
- }
- go func() {
- _, err := receiver.moscato.Receive(msg)
- if err != nil {
- fmt.Println(err)
- }
- }()
- reply.CompleteLog = "RM received"
- case WM:
- var msg WithdrawMsg
- err := json.Unmarshal(args.JsonMsg, &msg)
- if err != nil {
- return err
- }
- go func() {
- _, err := receiver.moscato.Receive(msg)
- if err != nil {
-
- }
- }()
- reply.CompleteLog = "WM received"
- default:
- return errors.New("message type Error: Not registered message type")
- }
- //reply.CompleteLog = "received completely"
- return nil
- }
-
- func (moscato *Moscato) preProcessMsg(originalMsg MsgUnit) MsgUnit {
- if originalMsg.CheckType() == PM {
- pubMsg := originalMsg.(PublishMsg)
- for index := 0; index < len(pubMsg.Topic); index++ {
- pubMsg.Topic[index] = pubMsg.Topic[index] - moscato.SecureManager.GetNodeKey(pubMsg.From)
- }
- for index := 0; index < len(pubMsg.Value); index++ {
- pubMsg.Value[index] = pubMsg.Value[index] - moscato.SecureManager.GetNodeKey(pubMsg.From)
- }
- return pubMsg
- } else if originalMsg.CheckType() == SM {
- subMsg := originalMsg.(SubscriptionMsg)
- for index := 0; index < len(subMsg.Topic); index++ {
- subMsg.Topic[index] = subMsg.Topic[index] - moscato.SecureManager.GetNodeKey(subMsg.From)
- }
- for index := 0; index < len(subMsg.Value); index++ {
- subMsg.Value[index] = subMsg.Value[index] - moscato.SecureManager.GetNodeKey(subMsg.From)
- }
- return subMsg
- }
- return nil
- }
-
- func (moscato *Moscato) SendWithEncrypt() MsgUnit {
- for {
- mt := <-moscato.SendQueue
- fmt.Println(mt)
- if mt.err == nil {
- for index := 0; index < len(mt.subList); index++ {
- tmpNode := mt.subList[index]
- tmpNodeIpAddr, _ := moscato.MicroServiceManager.GetIpaddr(tmpNode)
- moscato.SecureManager.ReEncPubMsg(mt.pubMsg.(PublishMsg), tmpNode)
- moscato.Send2MS(tmpNodeIpAddr, mt.pubMsg)
- }
- }
- return nil
- }
- }
-
- //Recieve - MM가 msg전달 받음
- func (moscato *Moscato) Receive(msg MsgUnit) (MsgUnit, error) {
- //rpc call
- var msg_type = msg.CheckType()
- //메세지 타입에 따라 다르게 처리
- switch msg_type {
-
- case KSM: //Key share msg
-
- case PM: //Publish msg
- log.Println("PM received")
- fmt.Println("PM: ", msg)
- moscato.queue.push(moscato.preProcessMsg(msg))
- fmt.Println("pushed")
-
- case SM: //Subscription msg
- err := moscato.SubscriptionManager.addSubscription(moscato.preProcessMsg(msg))
- if err != nil {
- println(err)
- //return nil, err
- }
-
- case RM: //Register msg
- log.Println("RM received")
- var newMsg RegisterMsg
- newMsg = msg.(RegisterMsg)
-
- newNode := MSNode{newMsg.From, newMsg.From}
- resultAddNode := moscato.MicroServiceManager.AddMicroservice(newNode)
- if resultAddNode {
- log.Println("Node added successful")
- } else {
- log.Println("Node is already added, ignore RM")
- return msg, nil
- }
-
- addr, _ := moscato.MicroServiceManager.GetIpaddr(newMsg.From)
- moscato.SecureManager.RegKey(newMsg)
- fmt.Println("Registered microservice: address", addr,
- "/ key", moscato.SecureManager.GetNodeKey(newMsg.From))
-
- // ackRM 메세지 전송
- go moscato.Send2MS(addr, newMsg)
-
- case WM: //Withdraw msg
- moscato.MicroServiceManager.RemoveMicroservice(msg.(WithdrawMsg).From)
- moscato.SecureManager.RemoveSecureKey(msg.(WithdrawMsg).From)
-
- default:
- return nil, errors.New("Message type Error: Not registered message type")
- }
-
- return msg, nil
- }
-
- //Ms로 보낼때 쓸 함수
- func (moscato *Moscato) Send2MS(ipaddress string, msg MsgUnit) {
- client, err := rpc.Dial("tcp", ipaddress+":8150")
- if err != nil {
- fmt.Println(err)
- return
- }
- defer client.Close()
-
- reply := new(Reply)
- jmsg, _ := msg.ConvertToJson()
- args := Args{
- JsonMsg: jmsg,
- Kind: msg.CheckType(),
- }
- err = client.Call("Receiver.Receive", args, reply)
- if err != nil {
- fmt.Println(err)
- return
- }
- //log.Println(reply.CompleteLog) //잘 받았는지 확인 해줌
- // 마이크로 서비스에게 받은 메시지는 노란색으로 출력
- log.Println(reply.CompleteLog)
- }
-
- func (moscato *Moscato) Run() {
-
- config := AppConfig{moscato}
- config.config()
-
- //모스카토 구조체 변수 초기화
- receiver := Receiver{moscato: moscato}
- err := moscato.queue.queue_init()
- moscato.SendQueue = make(chan myType)
- moscato.SubscriptionManager.Initialize()
-
- if err != nil {
- fmt.Println(err)
- return
- }
-
- //go routine -> matching 동작
- go func() {
- for {
- msg := moscato.queue.pop(true)
- fmt.Println(msg)
- go moscato.Matching(msg)
- go moscato.SendWithEncrypt()
- }
- }()
-
- //go moscato.CheckQueue()
-
- //rpc 등록 -> Receive 함수
- err = rpc.Register(receiver)
- if err != nil {
- log.Println(err)
- return
- }
-
- go Listen()
- color.Blue("initializing complete.")
- fmt.Scanln()
- }
-
- func Listen() {
- //l, err := net.Listen("tcp", fmt.Sprintf(":%v", 8160))
- l, err1 := net.Listen("tcp", ":8160") //MS로 부터 받는거
- //l2, err2 := net.Listen("tcp","0.0.0.0:8150")
- if err1 != nil {
- log.Fatal(fmt.Sprintf("Unable to listen on given port: %s", err1))
- }
- defer l.Close()
-
- /*if err2 != nil {
- log.Fatal(fmt.Sprintf("Unable to listen on given port: %s", err2))
- }
- defer l2.Close()*/
-
- for {
- conn, _ := l.Accept()
- go rpc.ServeConn(conn)
- }
- /*for {
- conn, _ := l2.Accept()
- go rpc.ServeConn(conn)
- }*/
- }
|