Project Moscato Team Messaging Middleware Implemetation Message Middleware by Golang Operate as Secure, Effectively
Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

subscription.go 6.0KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. package modules
  2. import (
  3. "errors"
  4. )
  5. type sub_manager struct {
  6. // NameList의 노드들이 각각의 list를 갖음
  7. list NameList
  8. /* sub# 관리 */
  9. count_sub int // Subscription #
  10. emptylist []int // To administrate sub #
  11. sub2ip map[int]string // sub2ip[sub#] = ip
  12. ip2sub map[string][]int // ip2sub[ip]= sub#...
  13. sub2node map[int][]*Node // sub2node[sub#] = node_addr
  14. israngesub map[int]bool // delete시에 관리를 위해
  15. }
  16. // ### To Insert sub#
  17. func (manager *sub_manager) add_subscription(msg MsgUnit) error {
  18. topic := msg.(*SubscriptionMsg).topic
  19. value := msg.(*SubscriptionMsg).value
  20. operator := msg.(*SubscriptionMsg).operator
  21. subnumber := 0
  22. //fmt.Println("message = ", msg)
  23. // * 1. 들어온 Msg -> sub#, sub# -> Msg로 매핑
  24. if len(manager.emptylist) == 0 {
  25. manager.sub2ip[manager.count_sub] = msg.(*SubscriptionMsg).from
  26. manager.ip2sub[msg.(*SubscriptionMsg).from] = append(manager.ip2sub[msg.(*SubscriptionMsg).from], manager.count_sub)
  27. subnumber = manager.count_sub
  28. manager.count_sub++
  29. } else {
  30. subidx := manager.emptylist[len(manager.emptylist)-1]
  31. manager.emptylist = manager.emptylist[:len(manager.emptylist)-1]
  32. manager.sub2ip[subidx] = msg.(*SubscriptionMsg).from
  33. manager.ip2sub[msg.(*SubscriptionMsg).from] = append(manager.ip2sub[msg.(*SubscriptionMsg).from], subidx)
  34. subnumber = subidx
  35. }
  36. // * 2. Sub 추가
  37. nameptr := manager.list.head
  38. findOk := false
  39. //namelist에서 name찾고, 없으면 추가
  40. for nameptr != nil {
  41. // * compare함수 구현되기 전 임시로 끼워넣은 것 (이 부분은 secure.go 완성되면 다시 확인)
  42. if compare(nameptr.topic, topic) == 0 {
  43. findOk = true
  44. break
  45. }
  46. nameptr = nameptr.next
  47. }
  48. if !findOk {
  49. newNode := &NameNode{topic, nil, nil, List{}}
  50. manager.list.tail.next = newNode
  51. manager.list.tail = newNode
  52. manager.list.size++
  53. nameptr = manager.list.tail
  54. }
  55. // list[name]에 value추가
  56. if len(operator) == 1 { // 단일 식이라면 (single)
  57. valptr := nameptr.list.getPos(value)
  58. if valptr == nil {
  59. nameptr.list.add_ValueNode(value)
  60. valptr = nameptr.list.tail
  61. }
  62. manager.sub2node[subnumber] = append(manager.sub2node[subnumber], valptr)
  63. valptr.insert_Sub(operator[0], subnumber, true)
  64. return nil // add_subscription ok
  65. } else {
  66. // '&&' , '||'로 묶여 있는 복합식에 대하여
  67. logical_operator := operator[1]
  68. // find(namelist[name].list.val == value)인 노드
  69. // * 이 부분 다시 봐야함..
  70. valptr1 := nameptr.list.getPos(value)
  71. valptr2 := nameptr.list.getPos(value)
  72. if valptr1 == nil {
  73. nameptr.list.add_ValueNode(value)
  74. valptr1 = nameptr.list.tail
  75. }
  76. if valptr2 == nil {
  77. nameptr.list.add_ValueNode(value)
  78. valptr2 = nameptr.list.tail
  79. }
  80. manager.sub2node[subnumber] = append(manager.sub2node[subnumber], valptr1)
  81. manager.sub2node[subnumber] = append(manager.sub2node[subnumber], valptr2)
  82. if logical_operator == "&&" { // '&&'로 묶여있다면
  83. // rangesub check
  84. manager.israngesub[subnumber] = true
  85. valptr1.insert_Sub(operator[0], subnumber, false)
  86. valptr2.insert_Sub(operator[2], subnumber, false)
  87. } else {
  88. // '||'로 묶여 있다면 두 식 모두 single로 처리가능
  89. valptr1.insert_Sub(operator[0], subnumber, true)
  90. valptr2.insert_Sub(operator[2], subnumber, true)
  91. }
  92. return nil // add_subscription ok
  93. }
  94. return errors.New("Can't add_subscription")
  95. }
  96. // * To delete subscription
  97. func (manager *sub_manager) delete(from string) error {
  98. ip := from
  99. cand := manager.ip2sub[ip]
  100. for i := 0; i < len(cand); i++ {
  101. sub := cand[i]
  102. node := manager.sub2node[sub]
  103. if manager.israngesub[sub] {
  104. for j := 0; j < len(node); j++ {
  105. pos := findSub(node[j].range2sub_s, sub)
  106. if pos != -1 {
  107. node[j].range2sub_s = remove(node[j].range2sub_s, pos)
  108. manager.emptylist = append(manager.emptylist, sub)
  109. }
  110. pos = findSub(node[j].range2sub_es, sub)
  111. if pos != -1 {
  112. node[j].range2sub_es = remove(node[j].range2sub_es, pos)
  113. manager.emptylist = append(manager.emptylist, sub)
  114. }
  115. pos = findSub(node[j].range2sub_b, sub)
  116. if pos != -1 {
  117. node[j].range2sub_b = remove(node[j].range2sub_b, pos)
  118. manager.emptylist = append(manager.emptylist, sub)
  119. }
  120. pos = findSub(node[j].range2sub_eb, sub)
  121. if pos != -1 {
  122. node[j].range2sub_eb = remove(node[j].range2sub_eb, pos)
  123. manager.emptylist = append(manager.emptylist, sub)
  124. }
  125. // node가 비어있으면 delete
  126. isempty := node[j].isempty()
  127. if isempty && node[j] != nil {
  128. prev_node := node[j].prev
  129. next_node := node[j].next
  130. prev_node.next = node[j].next
  131. next_node.prev = node[j].prev
  132. }
  133. }
  134. } else {
  135. for j := 0; j < len(node); j++ {
  136. pos := findSub(node[j].single2sub_s, sub)
  137. if pos != -1 {
  138. node[j].single2sub_s = remove(node[j].single2sub_s, pos)
  139. manager.emptylist = append(manager.emptylist, sub)
  140. }
  141. pos = findSub(node[j].single2sub_es, sub)
  142. if pos != -1 {
  143. node[j].single2sub_es = remove(node[j].single2sub_es, pos)
  144. manager.emptylist = append(manager.emptylist, sub)
  145. }
  146. pos = findSub(node[j].single2sub_b, sub)
  147. if pos != -1 {
  148. node[j].single2sub_b = remove(node[j].single2sub_b, pos)
  149. manager.emptylist = append(manager.emptylist, sub)
  150. }
  151. pos = findSub(node[j].single2sub_eb, sub)
  152. if pos != -1 {
  153. node[j].single2sub_eb = remove(node[j].single2sub_eb, pos)
  154. manager.emptylist = append(manager.emptylist, sub)
  155. }
  156. pos = findSub(node[j].single2sub_e, sub)
  157. if pos != -1 {
  158. node[j].single2sub_e = remove(node[j].single2sub_e, pos)
  159. manager.emptylist = append(manager.emptylist, sub)
  160. }
  161. isempty := node[j].isempty()
  162. // node가 비어있으면 delete
  163. if isempty && node[j] != nil {
  164. prev_node := node[j].prev
  165. next_node := node[j].next
  166. prev_node.next = node[j].next
  167. next_node.prev = node[j].prev
  168. }
  169. }
  170. }
  171. }
  172. manager.ip2sub[ip]=nil
  173. return nil
  174. }
  175. // * 암호화된 두 value 비교함수 (임시)
  176. func compare(v1 []int64, v2 []int64) int {
  177. return 1
  178. }