33namespace Workbunny \WebmanSharedCache \Traits ;
44
55use Closure ;
6- use MongoDB \Driver \Exception \RuntimeException ;
7- use Workbunny \WebmanSharedCache \Cache ;
86use Workbunny \WebmanSharedCache \Future ;
97use Error ;
108
@@ -107,9 +105,8 @@ protected static function _ChPublish(string $key, $message, bool $store = true,
107105 {
108106 $ func = __FUNCTION__ ;
109107 $ params = func_get_args ();
110- $ r = false ;
111- return $ r && self ::_Atomic ($ key , function () use (
112- $ key , $ message , $ func , $ params , $ store , $ workerId , &$ r
108+ return self ::_Atomic ($ key , function () use (
109+ $ key , $ message , $ func , $ params , $ store , $ workerId
113110 ) {
114111 /**
115112 * [
@@ -154,7 +151,7 @@ protected static function _ChPublish(string $key, $message, bool $store = true,
154151 if (self ::isChannelUseSignal ()) {
155152 $ list = self ::_Get (self ::$ _CHANNEL_PID_LIST , []);
156153 foreach ($ list as $ pid ) {
157- $ r = self ::_Atomic (self ::$ _CHANNEL_EVENT_LIST , function () use ($ pid ) {
154+ self ::_Atomic (self ::$ _CHANNEL_EVENT_LIST , function () use ($ pid ) {
158155 // 设置通道事件标记
159156 $ channelEventList = self ::_Get (self ::$ _CHANNEL_EVENT_LIST , []);
160157 $ channelEventList [$ pid ][] = 1 ;
@@ -192,7 +189,7 @@ protected static function _ChCreateListener(string $key, $workerId, Closure $lis
192189 if (isset (self ::$ _listeners [$ key ])) {
193190 throw new Error ("Channel $ key listener already exist. " );
194191 }
195- $ r = self ::_Atomic ($ key , function () use (
192+ self ::_Atomic ($ key , function () use (
196193 $ key , $ workerId , $ func , $ params , $ listener , &$ result
197194 ) {
198195 // 信号监听则注册pid
@@ -213,7 +210,7 @@ protected static function _ChCreateListener(string $key, $workerId, Closure $lis
213210 // 监听器回调函数
214211 $ callback = function () use ($ key , $ workerId , $ listener ) {
215212 // 原子性执行
216- $ r = self ::_Atomic ($ key , function () use ($ key , $ workerId , $ listener ) {
213+ self ::_Atomic ($ key , function () use ($ key , $ workerId , $ listener ) {
217214 // 信号监听
218215 if (self ::isChannelUseSignal ()) {
219216 $ pid = posix_getpid ();
@@ -239,9 +236,6 @@ protected static function _ChCreateListener(string $key, $workerId, Closure $lis
239236 }
240237
241238 });
242- if (!$ r ) {
243- throw new \RuntimeException ('Channel callback failed. ' );
244- }
245239 };
246240 // 设置回调
247241 $ channel [$ workerId ]['futureId ' ] = self ::$ _listeners [$ key ] = $ result = Future::add ($ callback , [], self ::$ interval );
@@ -261,7 +255,7 @@ protected static function _ChCreateListener(string $key, $workerId, Closure $lis
261255 'result ' => null
262256 ];
263257 }, true );
264- return $ r ? $ result : false ;
258+ return $ result ;
265259 }
266260
267261 /**
@@ -276,7 +270,7 @@ protected static function _ChRemoveListener(string $key, $workerId, bool $remove
276270 {
277271 $ func = __FUNCTION__ ;
278272 $ params = func_get_args ();
279- $ r = self ::_Atomic ($ key , function () use (
273+ self ::_Atomic ($ key , function () use (
280274 $ key , $ workerId , $ func , $ params , $ remove
281275 ) {
282276 if ($ id = self ::$ _listeners [$ key ] ?? null ) {
@@ -320,8 +314,5 @@ protected static function _ChRemoveListener(string $key, $workerId, bool $remove
320314 'result ' => null
321315 ];
322316 }, true );
323- if (!$ r ) {
324- throw new RuntimeException ("Channel {$ key } listener remove failed " );
325- }
326317 }
327318}
0 commit comments