Project Moscato Team Messaging Middleware Implemetation Message Middleware by Golang Operate as Secure, Effectively
選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

subscription.go 6.1KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. package modules
  2. import (
  3. "errors"
  4. "fmt"
  5. )
  6. type sub_manager struct {
  7. list topicList
  8. /* Manage sub# */
  9. count_sub int // Subscription #
  10. emptylist []int // To administrate sub #
  11. ip2sub map[string][]int // ip2sub[ip] = sub# ...
  12. sub2ip map[int]string // sub2ip[sub#] = ip
  13. sub2node map[int][]*valueNode // sub2node[sub#] = node_addr ...
  14. israngesub map[int]bool // To manage when deleted
  15. }
  16. func (manager *sub_manager) Initialize() {
  17. // Some initialize
  18. manager.ip2sub = make(map[string][]int)
  19. manager.sub2ip = make(map[int]string)
  20. manager.sub2node = make(map[int][]*valueNode)
  21. manager.israngesub = make(map[int]bool)
  22. }
  23. func newSubmng() *sub_manager {
  24. subMng := &sub_manager{}
  25. subMng.Initialize()
  26. return subMng
  27. }
  28. // ### To Insert sub#
  29. func (manager *sub_manager) addSubscription(msg MsgUnit) error {
  30. topic := msg.(SubscriptionMsg).Topic
  31. value := msg.(SubscriptionMsg).Value
  32. operator := msg.(SubscriptionMsg).Operator
  33. subnumber := 0
  34. // * 1. Mapping incoming IP address to sub #
  35. fmt.Println("add sub st")
  36. fmt.Println("sub val", value)
  37. if len(manager.emptylist) == 0 {
  38. subnumber = manager.count_sub
  39. manager.ip2sub[msg.(SubscriptionMsg).From] = append(manager.ip2sub[msg.(SubscriptionMsg).From], manager.count_sub)
  40. manager.sub2ip[subnumber] = msg.(SubscriptionMsg).From
  41. manager.count_sub++
  42. } else {
  43. subnumber := manager.emptylist[len(manager.emptylist)-1]
  44. manager.emptylist = manager.emptylist[:len(manager.emptylist)-1]
  45. manager.ip2sub[msg.(SubscriptionMsg).From] = append(manager.ip2sub[msg.(SubscriptionMsg).From], subnumber)
  46. manager.sub2ip[subnumber] = msg.(SubscriptionMsg).From
  47. }
  48. nameptr := manager.list.head
  49. findOk := false
  50. // * 2. Add Subscription
  51. // Find name in namelist, add if not found
  52. for nameptr != nil {
  53. if Compare(nameptr.topic, topic) == 0 {
  54. findOk = true
  55. break
  56. }
  57. nameptr = nameptr.next
  58. }
  59. if !findOk {
  60. manager.list.addTopicNode(topic)
  61. nameptr = manager.list.tail
  62. }
  63. // Add Value to list[name]
  64. if len(operator) == 1 { // if single expression
  65. valptr := nameptr.list.getValueNodePos(value)
  66. if valptr == nil {
  67. nameptr.list.addValueNode(value)
  68. valptr = nameptr.list.tail
  69. }
  70. manager.sub2node[subnumber] = append(manager.sub2node[subnumber], valptr)
  71. valptr.insertSub(operator[0], subnumber, true)
  72. return nil // AddSubscription ok
  73. } else {
  74. // For compound expressions bounded by '&&' and '||'
  75. // (ex) { (234 < x) && (x <= 1293) } , { (234 < x) || ( x < 1293) }
  76. logical_operator := operator[2]
  77. // Find ValueNode = (namelist[name].list.val == Value)
  78. valptr1 := nameptr.list.getValueNodePos([]int64{value[0]})
  79. valptr2 := nameptr.list.getValueNodePos([]int64{value[2]})
  80. if valptr1 == nil {
  81. nameptr.list.addValueNode([]int64{value[0]})
  82. valptr1 = nameptr.list.tail
  83. }
  84. if valptr2 == nil {
  85. nameptr.list.addValueNode([]int64{value[2]})
  86. valptr2 = nameptr.list.tail
  87. }
  88. manager.sub2node[subnumber] = append(manager.sub2node[subnumber], valptr1)
  89. manager.sub2node[subnumber] = append(manager.sub2node[subnumber], valptr2)
  90. if logical_operator == "&&" {
  91. // If they are enclosed in '&&' -> Insert Value to range_operator_list
  92. manager.israngesub[subnumber] = true
  93. valptr1.insertSub(operator[0], subnumber, false)
  94. valptr2.insertSub(operator[4], subnumber, false)
  95. } else {
  96. // if they are enclosed in '||' -> Insert Value to single_operator_list
  97. valptr1.insertSub(operator[0], subnumber, true)
  98. valptr2.insertSub(operator[4], subnumber, true)
  99. }
  100. return nil // addSubscription ok
  101. }
  102. return errors.New("Can't addSubscription")
  103. }
  104. // To delete subscription
  105. func (manager *sub_manager) delete(from string) error {
  106. ip := from
  107. cand := manager.ip2sub[ip]
  108. for i := 0; i < len(cand); i++ {
  109. sub := cand[i]
  110. node := manager.sub2node[sub]
  111. if manager.israngesub[sub] {
  112. for j := 0; j < len(node); j++ {
  113. pos := findSub(node[j].range2sub_s, sub)
  114. if pos != -1 {
  115. node[j].range2sub_s = remove(node[j].range2sub_s, pos)
  116. manager.emptylist = append(manager.emptylist, sub)
  117. }
  118. pos = findSub(node[j].range2sub_es, sub)
  119. if pos != -1 {
  120. node[j].range2sub_es = remove(node[j].range2sub_es, pos)
  121. manager.emptylist = append(manager.emptylist, sub)
  122. }
  123. pos = findSub(node[j].range2sub_b, sub)
  124. if pos != -1 {
  125. node[j].range2sub_b = remove(node[j].range2sub_b, pos)
  126. manager.emptylist = append(manager.emptylist, sub)
  127. }
  128. pos = findSub(node[j].range2sub_eb, sub)
  129. if pos != -1 {
  130. node[j].range2sub_eb = remove(node[j].range2sub_eb, pos)
  131. manager.emptylist = append(manager.emptylist, sub)
  132. }
  133. isempty := node[j].isEmpty()
  134. // Delete if Value Node is empty
  135. if isempty && node[j] != nil {
  136. prev_node := node[j].prev
  137. next_node := node[j].next
  138. prev_node.next = node[j].next
  139. next_node.prev = node[j].prev
  140. }
  141. }
  142. } else {
  143. for j := 0; j < len(node); j++ {
  144. pos := findSub(node[j].single2sub_s, sub)
  145. if pos != -1 {
  146. node[j].single2sub_s = remove(node[j].single2sub_s, pos)
  147. manager.emptylist = append(manager.emptylist, sub)
  148. }
  149. pos = findSub(node[j].single2sub_es, sub)
  150. if pos != -1 {
  151. node[j].single2sub_es = remove(node[j].single2sub_es, pos)
  152. manager.emptylist = append(manager.emptylist, sub)
  153. }
  154. pos = findSub(node[j].single2sub_b, sub)
  155. if pos != -1 {
  156. node[j].single2sub_b = remove(node[j].single2sub_b, pos)
  157. manager.emptylist = append(manager.emptylist, sub)
  158. }
  159. pos = findSub(node[j].single2sub_eb, sub)
  160. if pos != -1 {
  161. node[j].single2sub_eb = remove(node[j].single2sub_eb, pos)
  162. manager.emptylist = append(manager.emptylist, sub)
  163. }
  164. pos = findSub(node[j].single2sub_e, sub)
  165. if pos != -1 {
  166. node[j].single2sub_e = remove(node[j].single2sub_e, pos)
  167. manager.emptylist = append(manager.emptylist, sub)
  168. }
  169. isempty := node[j].isEmpty()
  170. // Delete if Value Node is empty
  171. if isempty && node[j] != nil {
  172. prevNode := node[j].prev
  173. nextNode := node[j].next
  174. prevNode.next = node[j].next
  175. nextNode.prev = node[j].prev
  176. }
  177. }
  178. }
  179. }
  180. manager.ip2sub[ip] = nil // Delete sub#s mapped to Ip address
  181. return nil
  182. }