Przeglądaj źródła

[add] 비동기적으로 queue를 pop해서 Handling할 수 있도록 init.go에 checkQueue 메소드 구현 후 run()에서 고루틴으로 실행하도록 함.

master
kidjung 4 lat temu
rodzic
commit
292a60d196
1 zmienionych plików z 9 dodań i 1 usunięć
  1. 9
    1
      src/broker/modules/init.go

+ 9
- 1
src/broker/modules/init.go Wyświetl plik

122
 	return nil
122
 	return nil
123
 }
123
 }
124
 
124
 
125
+func (moscato *Moscato) CheckQueue() MsgUnit {
126
+	for {
127
+		pmMsg := moscato.queue.pop(true)
128
+		fmt.Println("queue popped : ", pmMsg)
129
+	}
130
+}
131
+
125
 //Recieve - MM가 msg전달 받음
132
 //Recieve - MM가 msg전달 받음
126
 func (moscato *Moscato) Receive(msg MsgUnit) (MsgUnit, error) {
133
 func (moscato *Moscato) Receive(msg MsgUnit) (MsgUnit, error) {
127
 	//rpc call
134
 	//rpc call
133
 
140
 
134
 	case PM: //Publish msg
141
 	case PM: //Publish msg
135
 		moscato.queue.push(msg.(PublishMsg))
142
 		moscato.queue.push(msg.(PublishMsg))
136
-		log.Println("popped queue: ", moscato.queue.pop(false))
137
 		log.Println("PM received")
143
 		log.Println("PM received")
138
 
144
 
139
 	case SM: //Subscription msg
145
 	case SM: //Subscription msg
197
 
203
 
198
 	//go routine -> matching 동작
204
 	//go routine -> matching 동작
199
 	go moscato.match_mng.matching(&moscato.queue)
205
 	go moscato.match_mng.matching(&moscato.queue)
206
+	go moscato.CheckQueue()
200
 
207
 
201
 	//rpc 등록 -> Receive 함수
208
 	//rpc 등록 -> Receive 함수
202
 	err = rpc.Register(receiver)
209
 	err = rpc.Register(receiver)
204
 		log.Println(err)
211
 		log.Println(err)
205
 		return
212
 		return
206
 	}
213
 	}
214
+
207
 	Listen()
215
 	Listen()
208
 	log.Println("listen complete.")
216
 	log.Println("listen complete.")
209
 	fmt.Scanln()
217
 	fmt.Scanln()

Ładowanie…
Anuluj
Zapisz