33namespace Workbunny \WebmanSharedCache \Traits ;
44
55use Closure ;
6+ use MongoDB \Driver \Exception \RuntimeException ;
67use Workbunny \WebmanSharedCache \Cache ;
78use Workbunny \WebmanSharedCache \Future ;
89use Error ;
@@ -106,73 +107,70 @@ protected static function _ChPublish(string $key, $message, bool $store = true,
106107 {
107108 $ func = __FUNCTION__ ;
108109 $ params = func_get_args ();
109- self ::_Atomic ($ key , function () use (
110- $ key , $ message , $ func , $ params , $ store , $ workerId
111- ) {
112- /**
113- * [
114- * workerId = [
115- * 'futureId' => futureId,
116- * 'value' => array
117- * ]
118- * ]
119- */
120- $ channel = self ::_Get ($ channelName = self ::GetChannelKey ($ key ), []);
121- // 如果还没有监听器,将数据投入默认
122- if (!$ channel ) {
123- if ($ store ) {
110+ $ r = false ;
111+ return $ r && self ::_Atomic ($ key , function () use (
112+ $ key , $ message , $ func , $ params , $ store , $ workerId , &$ r
113+ ) {
114+ /**
115+ * [
116+ * workerId = [
117+ * 'futureId' => futureId,
118+ * 'value' => array
119+ * ]
120+ * ]
121+ */
122+ $ channel = self ::_Get ($ channelName = self ::GetChannelKey ($ key ), []);
123+ // 如果还没有监听器,将数据投入默认
124+ if (!$ channel ) {
125+ if ($ store ) {
126+ // 非指定workerId
127+ if ($ workerId === null ) {
128+ $ channel ['--default-- ' ]['value ' ][] = $ message ;
129+ } // 指定workerId
130+ else {
131+ $ channel [$ workerId ]['value ' ][] = $ message ;
132+ }
133+
134+ }
135+ } // 否则将消息投入到每个worker的监听器数据中
136+ else {
124137 // 非指定workerId
125138 if ($ workerId === null ) {
126- $ channel ['--default-- ' ]['value ' ][] = $ message ;
127- }
128- // 指定workerId
139+ foreach ($ channel as $ workerId => $ item ) {
140+ if ($ store or isset ($ item ['futureId ' ])) {
141+ $ channel [$ workerId ]['value ' ][] = $ message ;
142+ }
143+ }
144+ } // 指定workerId
129145 else {
130- $ channel [$ workerId ]['value ' ][] = $ message ;
131- }
132-
133- }
134- }
135- // 否则将消息投入到每个worker的监听器数据中
136- else {
137- // 非指定workerId
138- if ($ workerId === null ) {
139- foreach ($ channel as $ workerId => $ item ) {
140- if ($ store or isset ($ item ['futureId ' ])) {
146+ if ($ store or isset ($ channel [$ workerId ]['futureId ' ])) {
141147 $ channel [$ workerId ]['value ' ][] = $ message ;
142148 }
143149 }
144150 }
145- // 指定workerId
146- else {
147- if ($ store or isset ($ channel [$ workerId ]['futureId ' ])) {
148- $ channel [$ workerId ]['value ' ][] = $ message ;
149- }
150- }
151- }
152151
153- self ::_Set ($ channelName , $ channel );
154- // 使用信号监听
155- if (self ::isChannelUseSignal ()) {
156- $ list = self ::_Get (self ::$ _CHANNEL_PID_LIST , []);
157- foreach ($ list as $ pid ) {
158- self ::_Atomic (self ::$ _CHANNEL_EVENT_LIST , function () use ($ pid ) {
159- // 设置通道事件标记
160- $ channelEventList = self ::_Get (self ::$ _CHANNEL_EVENT_LIST , []);
161- $ channelEventList [$ pid ][] = 1 ;
162- self ::_Set (self ::$ _CHANNEL_EVENT_LIST , $ channelEventList );
163- // 发送信号通知进程
164- @posix_kill ($ pid , Future::$ signal );
165- });
152+ self ::_Set ($ channelName , $ channel );
153+ // 使用信号监听
154+ if (self ::isChannelUseSignal ()) {
155+ $ list = self ::_Get (self ::$ _CHANNEL_PID_LIST , []);
156+ foreach ($ list as $ pid ) {
157+ $ r = self ::_Atomic (self ::$ _CHANNEL_EVENT_LIST , function () use ($ pid ) {
158+ // 设置通道事件标记
159+ $ channelEventList = self ::_Get (self ::$ _CHANNEL_EVENT_LIST , []);
160+ $ channelEventList [$ pid ][] = 1 ;
161+ self ::_Set (self ::$ _CHANNEL_EVENT_LIST , $ channelEventList );
162+ // 发送信号通知进程
163+ @posix_kill ($ pid , Future::$ signal );
164+ });
165+ }
166166 }
167- }
168- return [
169- 'timestamp ' => microtime (true ),
170- 'method ' => $ func ,
171- 'params ' => $ params ,
172- 'result ' => null
173- ];
174- }, true );
175- return true ;
167+ return [
168+ 'timestamp ' => microtime (true ),
169+ 'method ' => $ func ,
170+ 'params ' => $ params ,
171+ 'result ' => null
172+ ];
173+ }, true );
176174 }
177175
178176 /**
@@ -194,7 +192,7 @@ protected static function _ChCreateListener(string $key, $workerId, Closure $lis
194192 if (isset (self ::$ _listeners [$ key ])) {
195193 throw new Error ("Channel $ key listener already exist. " );
196194 }
197- self ::_Atomic ($ key , function () use (
195+ $ r = self ::_Atomic ($ key , function () use (
198196 $ key , $ workerId , $ func , $ params , $ listener , &$ result
199197 ) {
200198 // 信号监听则注册pid
@@ -215,7 +213,7 @@ protected static function _ChCreateListener(string $key, $workerId, Closure $lis
215213 // 监听器回调函数
216214 $ callback = function () use ($ key , $ workerId , $ listener ) {
217215 // 原子性执行
218- self ::_Atomic ($ key , function () use ($ key , $ workerId , $ listener ) {
216+ $ r = self ::_Atomic ($ key , function () use ($ key , $ workerId , $ listener ) {
219217 // 信号监听
220218 if (self ::isChannelUseSignal ()) {
221219 $ pid = posix_getpid ();
@@ -241,6 +239,9 @@ protected static function _ChCreateListener(string $key, $workerId, Closure $lis
241239 }
242240
243241 });
242+ if (!$ r ) {
243+ throw new \RuntimeException ('Channel callback failed. ' );
244+ }
244245 };
245246 // 设置回调
246247 $ channel [$ workerId ]['futureId ' ] = self ::$ _listeners [$ key ] = $ result = Future::add ($ callback , [], self ::$ interval );
@@ -260,7 +261,7 @@ protected static function _ChCreateListener(string $key, $workerId, Closure $lis
260261 'result ' => null
261262 ];
262263 }, true );
263- return $ result ;
264+ return $ r ? $ result : false ;
264265 }
265266
266267 /**
@@ -275,7 +276,7 @@ protected static function _ChRemoveListener(string $key, $workerId, bool $remove
275276 {
276277 $ func = __FUNCTION__ ;
277278 $ params = func_get_args ();
278- self ::_Atomic ($ key , function () use (
279+ $ r = self ::_Atomic ($ key , function () use (
279280 $ key , $ workerId , $ func , $ params , $ remove
280281 ) {
281282 if ($ id = self ::$ _listeners [$ key ] ?? null ) {
@@ -319,5 +320,8 @@ protected static function _ChRemoveListener(string $key, $workerId, bool $remove
319320 'result ' => null
320321 ];
321322 }, true );
323+ if (!$ r ) {
324+ throw new RuntimeException ("Channel {$ key } listener remove failed " );
325+ }
322326 }
323327}
0 commit comments