Skip to content

Commit 93be81c

Browse files
committed
refactored executor code
1 parent cee0c1c commit 93be81c

3 files changed

Lines changed: 37 additions & 51 deletions

File tree

main/main.exe

100644100755
14 KB
Binary file not shown.

main/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ func timeStampMillis() int {
1414
func main() {
1515

1616
totalCount := 0
17-
const MAX_CYCLES = 5
17+
const MAX_CYCLES = 10
1818

1919
sc := utils.NewTimedExecutor(2*time.Second, 500*time.Millisecond)
2020

utils/timedexecutor.go

Lines changed: 36 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -9,74 +9,60 @@ import (
99
)
1010

1111
type ScheduledExecutor struct {
12-
delay time.Duration
13-
ticker time.Ticker
14-
sigs chan os.Signal
15-
shutdown bool
12+
delay time.Duration
13+
ticker *time.Ticker
14+
sigs chan os.Signal
15+
ctx context.Context
16+
cancel context.CancelFunc
1617
}
1718

18-
func NewTimedExecutor(initialDelay time.Duration, delay time.Duration) ScheduledExecutor {
19-
return ScheduledExecutor{
20-
delay: delay,
21-
ticker: *time.NewTicker(initialDelay),
19+
func NewTimedExecutor(initialDelay time.Duration, delay time.Duration) *ScheduledExecutor {
20+
return &ScheduledExecutor{
21+
delay: delay,
2222
}
2323
}
2424

25-
// Start .. process() is the function to run periodically , runAsync detects if the function should block the executor when running or not. It blocks when false
2625
func (se *ScheduledExecutor) Start(task func(), runAsync bool) {
27-
// Create a context that can be cancelled
28-
ctx, cancel := context.WithCancel(context.Background())
29-
defer cancel()
30-
31-
se.shutdown = false
26+
se.ctx, se.cancel = context.WithCancel(context.Background())
3227
se.sigs = make(chan os.Signal, 1)
3328
signal.Notify(se.sigs, syscall.SIGINT, syscall.SIGTERM)
3429

3530
go func() {
36-
<-se.sigs // Block until a signal is received
37-
cancel()
38-
}()
39-
40-
firstExec := true
41-
42-
defer func() {
43-
se.close()
44-
close(se.sigs)
45-
}()
46-
for {
47-
if se.shutdown {
31+
select {
32+
case <-se.sigs:
33+
se.cancel()
34+
case <-se.ctx.Done():
4835
return
4936
}
50-
select {
51-
case <-se.ticker.C:
37+
}()
5238

53-
if firstExec {
54-
se.ticker.Stop()
55-
se.ticker = *time.NewTicker(se.delay)
56-
firstExec = false
57-
}
39+
go func() {
40+
time.Sleep(se.delay)
41+
se.ticker = time.NewTicker(se.delay)
42+
defer se.ticker.Stop()
5843

59-
if runAsync {
60-
go task()
61-
} else {
62-
task()
63-
}
64-
case <-ctx.Done():
65-
return
66-
default:
67-
if se.shutdown {
44+
for {
45+
select {
46+
case <-se.ticker.C:
47+
if runAsync {
48+
go task()
49+
} else {
50+
task()
51+
}
52+
case <-se.ctx.Done():
6853
return
6954
}
7055
}
71-
}
72-
73-
}
56+
}()
7457

75-
func (se *ScheduledExecutor) Close() error {
76-
se.shutdown = true
77-
return nil
58+
<-se.ctx.Done()
7859
}
7960

80-
func (se *ScheduledExecutor) close() {
81-
se.ticker.Stop()
61+
func (se *ScheduledExecutor) Close() {
62+
if se.cancel != nil {
63+
se.cancel()
64+
}
65+
if se.ticker != nil {
66+
se.ticker.Stop()
67+
}
8268
}

0 commit comments

Comments
 (0)