Moscato_DemoClient
選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

main.go 14KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590
  1. package main
  2. // TODO: 등록확인 되기 전에는 PM,SM 전송 불가하게 만들 것, 실행파일로 만든 후, AWS 이용해서 여러개 마이크로서비스 상태에서 테스트 해볼것
  3. import (
  4. "bufio"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "github.com/fatih/color"
  9. "log"
  10. "math/rand"
  11. "net"
  12. "net/rpc"
  13. "os"
  14. "os/signal"
  15. "strconv"
  16. "strings"
  17. "syscall"
  18. "unicode"
  19. )
  20. const (
  21. KGM = 1 + iota
  22. KSM
  23. PM
  24. SM
  25. RM
  26. WM
  27. )
  28. type Message struct {
  29. From string //ip 주소
  30. Version string
  31. Time string
  32. Kind int //종류
  33. }
  34. type MsgUnit interface {
  35. // ConvertToJson - send 전 json 형식으로 바꾸는 함수
  36. ConvertToJson() ([]byte, error)
  37. // CheckType - Message 의 타입을 알려줌
  38. CheckType() int
  39. }
  40. type RegisterMsg struct {
  41. Message
  42. PrivateKey int64
  43. }
  44. type SubscriptionMsg struct {
  45. Message
  46. Topic []int64 //대주제
  47. Value []int64 //피연산자
  48. Operator []string //연산자
  49. IsAlpha bool //value가 숫자인지 문자열인지
  50. }
  51. type PublishMsg struct {
  52. Message
  53. Topic []int64 //대주제
  54. Value []int64 //Topic 의 세부적인 내용
  55. Content []int64 // 내용
  56. }
  57. type WithdrawMsg struct {
  58. Message
  59. }
  60. type Receiver struct {
  61. thisNodeAddr string
  62. microService MicroService
  63. }
  64. type MicroService struct {
  65. ClientAddr string
  66. PrivateKey int64
  67. ShareKey int64
  68. // 메세지 미들웨어에 연결되어 있는가
  69. IsConnected chan bool
  70. MMAddress string
  71. }
  72. func (receiver Receiver) Receive(args Args, reply *Reply) error{
  73. switch args.Kind {
  74. case KSM:
  75. //var msg KeyShareMsg
  76. //err := json.Unmarshal(args.JsonMsg, &msg)
  77. //if err != nil {
  78. // return err
  79. //}
  80. //go func() {
  81. // _, err := receiver.moscato.Receive(msg)
  82. // if err != nil {
  83. //
  84. // }
  85. //}()
  86. //reply.CompleteLog = "received completely"
  87. case PM:
  88. var msg PublishMsg
  89. err := json.Unmarshal(args.JsonMsg, &msg)
  90. if err != nil {
  91. return err
  92. }
  93. go func() {
  94. receiver.microService.Receive(msg)
  95. }()
  96. reply.CompleteLog = "[" + receiver.thisNodeAddr + "] : pubMsg received"
  97. //var msg PublishMsg
  98. //err := json.Unmarshal(args.JsonMsg, &msg)
  99. //if err != nil {
  100. // return err
  101. //}
  102. //
  103. //go func() {
  104. // _, err := receiver.moscato.Receive(msg)
  105. // if err != nil {
  106. //
  107. // }
  108. //}()
  109. //reply.CompleteLog = "PM received completely"
  110. case SM:
  111. //var msg SubscriptionMsg
  112. //err := json.Unmarshal(args.JsonMsg, &msg)
  113. //if err != nil {
  114. // return err
  115. //}
  116. //go func() {
  117. // _, err := receiver.moscato.Receive(msg)
  118. // if err != nil {
  119. //
  120. // }
  121. //}()
  122. //reply.CompleteLog = "received completely"
  123. case RM:
  124. var msg RegisterMsg
  125. err := json.Unmarshal(args.JsonMsg, &msg)
  126. if err != nil {
  127. return err
  128. }
  129. go func() {
  130. receiver.microService.Receive(msg)
  131. }()
  132. reply.CompleteLog = "[" + receiver.thisNodeAddr + "] : ackRM received"
  133. case WM:
  134. //var msg WithdrawMsg
  135. //err := json.Unmarshal(args.JsonMsg, &msg)
  136. //if err != nil {
  137. // return err
  138. //}
  139. //go func() {
  140. // _, err := receiver.moscato.Receive(msg)
  141. // if err != nil {
  142. //
  143. // }
  144. //}()
  145. //reply.CompleteLog = "received completely"
  146. default:
  147. return errors.New("message type Error: Not registered message type")
  148. }
  149. //reply.CompleteLog = "received completely"
  150. return nil
  151. }
  152. func (microService MicroService) Receive(msg MsgUnit){
  153. var msg_type = msg.CheckType()
  154. //메세지 타입에 따라 다르게 처리
  155. switch msg_type {
  156. case KSM: //Key share msg
  157. case PM: //Publish msg
  158. fmt.Println("----- received PM ------")
  159. DecryptionMsg(msg.(PublishMsg),microService.ShareKey, microService.PrivateKey)
  160. fmt.Println("------------------------")
  161. case SM: //Subscription msg
  162. case RM: //Register msg
  163. microService.IsConnected <- true
  164. log.Println("you received RM: Registered Complete!")
  165. case WM: //Withdraw msg
  166. //moscato.MicroServiceManager.RemoveMicroservice(msg.(*WithdrawMsg).From)
  167. default:
  168. errors.New("message type Error: Not registered message type")
  169. }
  170. return
  171. }
  172. func (msg SubscriptionMsg) ConvertToJson() ([]byte, error) {
  173. js := msg
  174. jsonBytes, err := json.Marshal(js)
  175. return jsonBytes, err
  176. }
  177. func (msg PublishMsg) ConvertToJson() ([]byte, error) {
  178. js := msg
  179. jsonBytes, err := json.Marshal(js)
  180. return jsonBytes, err
  181. }
  182. func (msg RegisterMsg) ConvertToJson() ([]byte, error) {
  183. js := msg
  184. jsonBytes, err := json.Marshal(js)
  185. return jsonBytes, err
  186. }
  187. func (msg WithdrawMsg) ConvertToJson() ([]byte, error) {
  188. js := msg
  189. jsonBytes, err := json.Marshal(js)
  190. return jsonBytes, err
  191. }
  192. func (msg Message) CheckType() int {
  193. return msg.Kind
  194. }
  195. func CreatePubMsg(msg Message, topic string, value string, content string) PublishMsg {
  196. //toPubMsg := new(PublishMsg)
  197. //toPubMsg.Message = msg
  198. intArr := []rune(topic)
  199. //fmt.Print("Topic length ")
  200. //fmt.Println(len(intArr))
  201. //fmt.Println(len(toPubMsg.Topic))
  202. var topicInt [] int64
  203. for index := 0; index < len(intArr); index++ {
  204. //toPubMsg.Topic = append(toPubMsg.Topic, int64(intArr[index]))
  205. topicInt = append(topicInt, int64(intArr[index]))
  206. }
  207. //fmt.Println(len(toPubMsg.Topic))
  208. intArr = []rune(value)
  209. var valueInt [] int64
  210. strInt64, _ := strconv.ParseInt(value, 10, 64)
  211. if unicode.IsDigit(intArr[0]){
  212. valueInt = append(valueInt, strInt64)
  213. } else {
  214. for index := 0; index < len(intArr); index++ {
  215. //toPubMsg.Value = append(toPubMsg.Value, int64(intArr[index]))
  216. valueInt = append(valueInt, int64(intArr[index]))
  217. }
  218. }
  219. intArr = []rune(content)
  220. var contentInt [] int64
  221. for index := 0; index < len(intArr); index++ {
  222. //toPubMsg.content = append(toPubMsg.content, int64(intArr[index]))
  223. contentInt = append(contentInt, int64(intArr[index]))
  224. }
  225. return PublishMsg{msg,topicInt,valueInt,contentInt}
  226. }
  227. func CreateSubMsg(msg Message, topic string, value string, operator string, isAlpha bool) SubscriptionMsg {
  228. //toPubMsg := new(PublishMsg)
  229. //toPubMsg.Message = msg
  230. intArr := []rune(topic)
  231. //fmt.Print("Topic length ")
  232. //fmt.Println(len(intArr))
  233. //fmt.Println(len(toPubMsg.Topic))
  234. var topicInt [] int64
  235. for index := 0; index < len(intArr); index++ {
  236. //toPubMsg.Topic = append(toPubMsg.Topic, int64(intArr[index]))
  237. topicInt = append(topicInt, int64(intArr[index]))
  238. }
  239. //fmt.Println(len(toPubMsg.Topic))
  240. var valueInt [] int64
  241. if isAlpha {
  242. intArr = []rune(value)
  243. for index := 0; index < len(intArr); index++ {
  244. //toPubMsg.Value = append(toPubMsg.Value, int64(intArr[index]))
  245. valueInt = append(valueInt, int64(intArr[index]))
  246. }
  247. } else {
  248. stringSlice := strings.Split(value, " ")
  249. for index:=0; index < len(stringSlice); index++ {
  250. strInt64, _ := strconv.ParseInt(stringSlice[index], 10, 64)
  251. valueInt = append(valueInt, strInt64)
  252. }
  253. }
  254. var operatorSlice []string
  255. operatorSlice = strings.Split(operator, " ")
  256. //intArr = []rune(content)
  257. //var contentInt [] int64
  258. //for index := 0; index < len(intArr); index++ {
  259. // //toPubMsg.content = append(toPubMsg.content, int64(intArr[index]))
  260. // contentInt = append(contentInt, int64(intArr[index]))
  261. //}
  262. return SubscriptionMsg{msg,topicInt,valueInt,operatorSlice, isAlpha}
  263. }
  264. type Args struct { // 매개변수
  265. JsonMsg [] byte
  266. Kind int
  267. }
  268. type Reply struct { // 리턴값
  269. CompleteLog string
  270. }
  271. func generateRanUint64() uint64 {
  272. return uint64(rand.Uint32())<<32 + uint64(rand.Uint32())
  273. }
  274. func main(){
  275. var argu string
  276. for _, v := range os.Args {
  277. if strings.Index(v, "-mma") == 0 {
  278. argu = strings.Replace(v, "-mma=", "",-1)
  279. break
  280. }
  281. }
  282. MMAddress := argu
  283. if argu == "" {
  284. println("there is no Message Middleware address")
  285. return
  286. }
  287. args := new(Args)
  288. reply := new(Reply)
  289. currentIP := getCurrentIPAddr()
  290. const debugPK = 12345
  291. privateKey := int64(generateRanUint64())
  292. shareKey := int64(9999)
  293. microService := MicroService{currentIP,privateKey, shareKey, make(chan bool), MMAddress}
  294. receiver := Receiver{thisNodeAddr: currentIP,microService: microService}
  295. color.Blue("<current machine address : " + currentIP + "> <MM address : " + MMAddress + ">")
  296. color.Blue("<private key : " + strconv.FormatUint(uint64(privateKey), 10)+">")
  297. errReg := rpc.Register(receiver)
  298. if errReg != nil {
  299. log.Println(errReg)
  300. return
  301. }
  302. client, err := rpc.Dial("tcp", microService.MMAddress + ":8160") // RPC 서버에 연결
  303. if err != nil {
  304. fmt.Println(err)
  305. return
  306. }
  307. defer client.Close() // main 함수가 끝나기 직전에 RPC 연결을 닫음
  308. go Listen()
  309. // Register 메세지 생성
  310. message := Message{From: microService.ClientAddr, Version: "1", Time: "1", Kind: RM}
  311. regMsg := RegisterMsg{message, microService.PrivateKey}
  312. RJsonMsg,_ := regMsg.ConvertToJson()
  313. args.JsonMsg = RJsonMsg
  314. args.Kind = RM
  315. err = client.Call("Receiver.MmReceive", args, reply)
  316. log.Println("RM sent!")
  317. if err != nil {
  318. fmt.Println(err)
  319. return
  320. }
  321. color.Yellow("MM: " + reply.CompleteLog)
  322. // 연결 되면 채널로 연결 확인이 되어야 다음 단계로 넘어감
  323. checkConnect := <- microService.IsConnected
  324. _ = checkConnect
  325. fmt.Scanln()
  326. fmt.Println("***** sending Subscription messages *****")
  327. /*
  328. 파일에서 subscription 읽어서 subscription 보내기
  329. **/
  330. subFile, err := os.Open("subscription.txt")
  331. if err != nil {
  332. log.Fatalf("Error when opening file: %s", err)
  333. }
  334. defer subFile.Close()
  335. fileScanner := bufio.NewScanner(subFile)
  336. var subscriptionText string
  337. numSub := 0
  338. for fileScanner.Scan(){
  339. subscriptionText = fileScanner.Text()
  340. subscriptionTextSlice := strings.Split(subscriptionText, "/")
  341. subTopic := subscriptionTextSlice[0]
  342. subValue := subscriptionTextSlice[1]
  343. subOperator := subscriptionTextSlice[2]
  344. var subIsAlpha bool
  345. if subscriptionTextSlice[3] == "true"{
  346. subIsAlpha = true
  347. } else if subscriptionTextSlice[3] == "false" {
  348. subIsAlpha = false
  349. } else {
  350. println(numSub, "subscription isAlpha type error")
  351. }
  352. message = Message{From: microService.ClientAddr, Version: "1", Time: "1", Kind: SM}
  353. subMsg := CreateSubMsg(message, subTopic, subValue, subOperator, subIsAlpha)
  354. //subMsg := CreateSubMsg(message, "soccer", "player", "==", true)
  355. //fmt.Println(pubMsg)
  356. subMsg = EncryptionSubMsg(subMsg,microService.ShareKey,microService.PrivateKey)
  357. jsonMsg,_ := subMsg.ConvertToJson()
  358. args.JsonMsg = jsonMsg
  359. args.Kind = subMsg.Kind
  360. //fmt.Println(subMsg)
  361. //fmt.Println(string(args.JsonMsg))
  362. err = client.Call("Receiver.MmReceive", args, reply)
  363. log.Println("SM sent! #:",numSub)
  364. if err != nil {
  365. fmt.Println(err)
  366. return
  367. }
  368. //log.Println(reply.CompleteLog)
  369. color.Yellow("MM: " + reply.CompleteLog)
  370. }
  371. fmt.Scanln()
  372. fmt.Println("***** sending Publish messages *****")
  373. pubFile, err := os.Open("publish.txt")
  374. if err != nil {
  375. log.Fatalf("Error when opening file: %s", err)
  376. }
  377. defer pubFile.Close()
  378. fileScanner2 := bufio.NewScanner(pubFile)
  379. var publishText string
  380. numPub := 0
  381. for fileScanner2.Scan(){
  382. publishText = fileScanner2.Text()
  383. publishTextSlice := strings.Split(publishText, "/")
  384. //fmt.Println(publishTextSlice)
  385. pubTopic := publishTextSlice[0]
  386. pubValue := publishTextSlice[1]
  387. pubContent := publishTextSlice[2]
  388. message = Message{From: microService.ClientAddr, Version: "1", Time: "1", Kind: PM}
  389. pubMsg := CreatePubMsg(message, pubTopic, pubValue, pubContent)
  390. //subMsg := CreateSubMsg(message, "soccer", "player", "==", true)
  391. //fmt.Println("Pub.txt",pubMsg)
  392. pubMsg = EncryptionPubMsg(pubMsg,microService.ShareKey,microService.PrivateKey)
  393. jsonMsg,_ := pubMsg.ConvertToJson()
  394. args.JsonMsg = jsonMsg
  395. args.Kind = pubMsg.Kind
  396. //fmt.Println(subMsg)
  397. //fmt.Println(string(args.JsonMsg))
  398. err = client.Call("Receiver.MmReceive", args, reply)
  399. log.Println("PM sent! #:",numPub)
  400. if err != nil {
  401. fmt.Println(err)
  402. return
  403. }
  404. //log.Println(reply.CompleteLog)
  405. color.Yellow("MM: " + reply.CompleteLog)
  406. }
  407. sigs := make(chan os.Signal, 1)
  408. done := make(chan bool, 1)
  409. signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
  410. go func() {
  411. sig := <-sigs
  412. withDraw(message, client)
  413. //fmt.Println(sig)
  414. _ = sig
  415. done <- true
  416. fmt.Println("\nquit Moscato Successful and terminate microservice")
  417. }()
  418. <-done
  419. return
  420. }
  421. func withDraw(msg Message, client *rpc.Client){
  422. msg.Kind = WM
  423. wdMsg := WithdrawMsg{msg}
  424. var args Args
  425. args.JsonMsg, _ = wdMsg.ConvertToJson()
  426. args.Kind = WM
  427. var reply Reply
  428. client.Call("Receiver.MmReceive", args, reply)
  429. }
  430. func EncryptionPubMsg(msg PublishMsg, gyKey int64, privateKey int64) PublishMsg {
  431. for index := range msg.Topic {
  432. msg.Topic[index] = msg.Topic[index] + gyKey + privateKey
  433. }
  434. for index := range msg.Value {
  435. msg.Value[index] = msg.Value[index] + gyKey + privateKey
  436. }
  437. for index := range msg.Content {
  438. msg.Content[index] = msg.Content[index] + gyKey + privateKey
  439. }
  440. return msg
  441. }
  442. func EncryptionSubMsg(msg SubscriptionMsg, gyKey int64, privateKey int64) SubscriptionMsg {
  443. for index := range msg.Topic {
  444. msg.Topic[index] = msg.Topic[index] + gyKey + privateKey
  445. }
  446. for index := range msg.Value {
  447. msg.Value[index] = msg.Value[index] + gyKey + privateKey
  448. }
  449. return msg
  450. }
  451. func DecryptionMsg(msg PublishMsg, gyKey int64, privateKey int64) {
  452. for index := range msg.Topic {
  453. msg.Topic[index] = msg.Topic[index] - gyKey - privateKey
  454. }
  455. for index := range msg.Value {
  456. msg.Value[index] = msg.Value[index] - gyKey - privateKey
  457. }
  458. for index := range msg.Content {
  459. msg.Content[index] = msg.Content[index] - gyKey - privateKey
  460. }
  461. var runeArr []rune
  462. for index := range msg.Topic {
  463. runeArr = append(runeArr, rune(int(msg.Topic[index])))
  464. }
  465. fmt.Println("Topic is: " + string(runeArr))
  466. runeArr = nil
  467. for index := range msg.Value {
  468. runeArr = append(runeArr, rune(int(msg.Value[index])))
  469. }
  470. if len(runeArr) == 1{
  471. fmt.Println("Value is: ", runeArr)
  472. } else {
  473. fmt.Println("Value is:", string(runeArr))
  474. }
  475. //if unicode.IsDigit(runeArr[0]){
  476. // fmt.Println("Value is: " + string(runeArr))
  477. //} else {
  478. // //for index := 0; index < len(intArr); index++ {
  479. // // //toPubMsg.Value = append(toPubMsg.Value, int64(intArr[index]))
  480. // // valueInt = append(valueInt, int64(intArr[index]))
  481. // //}
  482. //}
  483. runeArr = nil
  484. for index := range msg.Content {
  485. runeArr = append(runeArr, rune(int(msg.Content[index])))
  486. }
  487. fmt.Println("Content is: " + string(runeArr))
  488. runeArr = nil
  489. }
  490. func Listen() {
  491. l, err1 := net.Listen("tcp", ":8150") //MM로 부터 받는거
  492. if err1 != nil {
  493. log.Fatal(fmt.Sprintf("Unable to listen on given port: %s", err1))
  494. }
  495. defer l.Close()
  496. for {
  497. conn, _ := l.Accept()
  498. go rpc.ServeConn(conn)
  499. }
  500. }