Project Moscato Team Messaging Middleware Implemetation Message Middleware by Golang Operate as Secure, Effectively
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

subscription.go 7.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. package modules
  2. import (
  3. "errors"
  4. )
  5. // Structure for managing subscriptions
  6. type sub_manager struct {
  7. list topicList // Data Structure for Manage TopicNode
  8. count_sub int // Subscription#
  9. emptylist []int // For administrate Subscription#(Deleted)
  10. ip2sub map[string][]int // For mapping {ip : Sub#s List}
  11. sub2ip map[int]string // For mapping {Sub# : ip}
  12. sub2node map[int][]nodeInfo // For mapping {Sub# : NodeInfo List}
  13. israngesub map[int]bool // To manage when deleted
  14. }
  15. type nodeInfo struct{
  16. valNodeList []*valueNode
  17. topic []int64
  18. }
  19. func (manager *sub_manager) Initialize() {
  20. // Some initialize
  21. manager.ip2sub = make(map[string][]int)
  22. manager.sub2ip = make(map[int]string)
  23. manager.sub2node = make(map[int][]nodeInfo)
  24. manager.israngesub = make(map[int]bool)
  25. }
  26. func newSubmng() *sub_manager {
  27. subMng := &sub_manager{}
  28. subMng.Initialize()
  29. return subMng
  30. }
  31. func (manager *sub_manager) isDuplicated(msg MsgUnit) bool{
  32. from := msg.(SubscriptionMsg).From
  33. topic := msg.(SubscriptionMsg).Topic
  34. value := msg.(SubscriptionMsg).Value
  35. operator := msg.(SubscriptionMsg).Operator
  36. subList := manager.ip2sub[from]
  37. canFind := false
  38. for i := 0; i < len(subList) && canFind == false ; i++{
  39. sub := subList[i]
  40. nodeinfoList := manager.sub2node[sub]
  41. for j := 0; j < len(nodeinfoList); j++{
  42. node := nodeinfoList[j]
  43. if Compare(node.topic, topic) == 0 {
  44. cnt := 0
  45. if len(operator) == 1{
  46. valPtr := node.valNodeList[0]
  47. if Compare(valPtr.val ,value) == 0{
  48. op := operator[0]
  49. cnt += valPtr.findOperatorList(op, sub, true)
  50. }
  51. } else {
  52. leftop := operator[0]
  53. logicalop := operator[1]
  54. rightop := operator[2]
  55. leftValuePtr := node.valNodeList[0]
  56. rightValuePtr := node.valNodeList[1]
  57. nodeValList := []int64{leftValuePtr.val[0], rightValuePtr.val[0]}
  58. if Compare(nodeValList, value) == 0 {
  59. if logicalop == "&&" {
  60. cnt += leftValuePtr.findOperatorList(leftop, sub, false)
  61. cnt += rightValuePtr.findOperatorList(rightop, sub, false)
  62. } else {
  63. cnt += leftValuePtr.findOperatorList(leftop, sub, true)
  64. cnt += rightValuePtr.findOperatorList(rightop, sub, true)
  65. }
  66. }
  67. }
  68. if cnt == len(node.valNodeList){
  69. canFind = true
  70. break
  71. }
  72. }
  73. }
  74. }
  75. return canFind
  76. }
  77. // To Insert sub#
  78. func (manager *sub_manager) addSubscription(msg MsgUnit) error {
  79. topic := msg.(SubscriptionMsg).Topic
  80. value := msg.(SubscriptionMsg).Value
  81. operator := msg.(SubscriptionMsg).Operator
  82. subnumber := 0
  83. // 0. Check if same IP & same <Topic, Value> exists
  84. if manager.isDuplicated(msg) == true {
  85. return errors.New("Duplicater Subscription")
  86. }
  87. // 1. Mapping incoming IP address to sub #
  88. if len(manager.emptylist) == 0 {
  89. subnumber = manager.count_sub
  90. manager.ip2sub[msg.(SubscriptionMsg).From] = append(manager.ip2sub[msg.(SubscriptionMsg).From], subnumber)
  91. manager.sub2ip[subnumber] = msg.(SubscriptionMsg).From
  92. manager.count_sub++
  93. } else {
  94. subnumber := manager.emptylist[len(manager.emptylist)-1]
  95. manager.emptylist = manager.emptylist[:len(manager.emptylist)-1]
  96. manager.ip2sub[msg.(SubscriptionMsg).From] = append(manager.ip2sub[msg.(SubscriptionMsg).From], subnumber)
  97. manager.sub2ip[subnumber] = msg.(SubscriptionMsg).From
  98. }
  99. topicptr := manager.list.head
  100. existTopic := false
  101. // 2. Add Subscription
  102. // Find topic in topiclist, add if not found
  103. for topicptr != nil {
  104. if Compare(topicptr.topic, topic) == 0 {
  105. existTopic = true
  106. break
  107. }
  108. topicptr = topicptr.next
  109. }
  110. if !existTopic {
  111. manager.list.addTopicNode(topic)
  112. topicptr = manager.list.tail
  113. }
  114. var addValNodeList []*valueNode
  115. // if single expression
  116. if len(operator) == 1 {
  117. valptr := topicptr.list.getValueNodePos(value)
  118. op := operator[0]
  119. if valptr == nil {
  120. topicptr.list.addValueNode(value)
  121. valptr = topicptr.list.tail
  122. }
  123. addValNodeList = append(addValNodeList, valptr)
  124. manager.sub2node[subnumber] = append(manager.sub2node[subnumber], nodeInfo{addValNodeList, topic})
  125. manager.israngesub[subnumber] = false
  126. valptr.insertSub(op, subnumber, true)
  127. return nil // AddSubscription ok
  128. } else {
  129. // if Multi expression
  130. // (ex) : { (234 < x) && (x <= 1293) } , { (234 < x) || (x < 1293) }
  131. leftOperator := operator[0]
  132. logicalOperator := operator[1] // For compound expressions bounded by '&&' and '||'
  133. rightOperator := operator[2]
  134. // Find ValueNode = (topiclist[topic].list.val == value)
  135. valptr1 := topicptr.list.getValueNodePos([]int64{value[0]})
  136. valptr2 := topicptr.list.getValueNodePos([]int64{value[1]})
  137. if valptr1 == nil {
  138. topicptr.list.addValueNode([]int64{value[0]})
  139. valptr1 = topicptr.list.tail
  140. }
  141. if valptr2 == nil {
  142. topicptr.list.addValueNode([]int64{value[1]})
  143. valptr2 = topicptr.list.tail
  144. }
  145. addValNodeList = append(addValNodeList, valptr1)
  146. addValNodeList = append(addValNodeList, valptr2)
  147. manager.sub2node[subnumber] = append(manager.sub2node[subnumber], nodeInfo{addValNodeList, topic})
  148. if logicalOperator == "&&" {
  149. // If they are enclosed in '&&' -> Insert Value to range_operator_list
  150. manager.israngesub[subnumber] = true
  151. valptr1.insertSub(leftOperator, subnumber, false)
  152. valptr2.insertSub(rightOperator, subnumber, false)
  153. } else {
  154. // if they are enclosed in '||' -> Insert Value to single_operator_list
  155. manager.israngesub[subnumber] = false
  156. valptr1.insertSub(leftOperator, subnumber, true)
  157. valptr2.insertSub(rightOperator, subnumber, true)
  158. }
  159. return nil // addSubscription ok
  160. }
  161. return errors.New("Can't addSubscription")
  162. }
  163. // To delete subscriptions
  164. func (manager *sub_manager) delete(from string) error {
  165. ip := from
  166. cand := manager.ip2sub[ip]
  167. for i := 0; i < len(cand); i++ {
  168. sub := cand[i]
  169. for j := 0; j < len(manager.sub2node[sub]); j++ {
  170. nodeinfo := manager.sub2node[sub][j]
  171. node := nodeinfo.valNodeList
  172. if manager.israngesub[sub] {
  173. for k := 0; k < len(node); k++ {
  174. pos := findSub(node[k].range2sub_s, sub)
  175. if pos != -1 {
  176. node[k].range2sub_s = remove(node[k].range2sub_s, pos)
  177. manager.emptylist = append(manager.emptylist, sub)
  178. }
  179. pos = findSub(node[k].range2sub_es, sub)
  180. if pos != -1 {
  181. node[k].range2sub_es = remove(node[k].range2sub_es, pos)
  182. manager.emptylist = append(manager.emptylist, sub)
  183. }
  184. pos = findSub(node[k].range2sub_b, sub)
  185. if pos != -1 {
  186. node[k].range2sub_b = remove(node[k].range2sub_b, pos)
  187. manager.emptylist = append(manager.emptylist, sub)
  188. }
  189. pos = findSub(node[k].range2sub_eb, sub)
  190. if pos != -1 {
  191. node[k].range2sub_eb = remove(node[k].range2sub_eb, pos)
  192. manager.emptylist = append(manager.emptylist, sub)
  193. }
  194. }
  195. } else {
  196. for k := 0; k < len(node); k++ {
  197. pos := findSub(node[k].single2sub_s, sub)
  198. if pos != -1 {
  199. node[k].single2sub_s = remove(node[k].single2sub_s, pos)
  200. manager.emptylist = append(manager.emptylist, sub)
  201. }
  202. pos = findSub(node[k].single2sub_es, sub)
  203. if pos != -1 {
  204. node[k].single2sub_es = remove(node[k].single2sub_es, pos)
  205. manager.emptylist = append(manager.emptylist, sub)
  206. }
  207. pos = findSub(node[k].single2sub_b, sub)
  208. if pos != -1 {
  209. node[k].single2sub_b = remove(node[k].single2sub_b, pos)
  210. manager.emptylist = append(manager.emptylist, sub)
  211. }
  212. pos = findSub(node[k].single2sub_eb, sub)
  213. if pos != -1 {
  214. node[k].single2sub_eb = remove(node[k].single2sub_eb, pos)
  215. manager.emptylist = append(manager.emptylist, sub)
  216. }
  217. pos = findSub(node[k].single2sub_e, sub)
  218. if pos != -1 {
  219. node[k].single2sub_e = remove(node[k].single2sub_e, pos)
  220. manager.emptylist = append(manager.emptylist, sub)
  221. }
  222. }
  223. }
  224. }
  225. manager.ip2sub[ip] = nil // Delete sub#s mapped to Ip address
  226. return nil
  227. }
  228. return errors.New("Don't Exist Subscription to delete")
  229. }