22// (c)2017 MindView LLC: see Copyright.txt
33// We make no guarantees that this code is fit for any purpose.
44// Visit http://OnJava8.com for more book information.
5- import java .util .concurrent .*;
65import java .util .*;
6+ import java .util .stream .*;
7+ import java .util .concurrent .*;
78import static java .util .concurrent .TimeUnit .*;
89
910class DelayedTask implements Runnable , Delayed {
@@ -12,7 +13,7 @@ class DelayedTask implements Runnable, Delayed {
1213 private final int delta ;
1314 private final long trigger ;
1415 protected static List <DelayedTask > sequence =
15- new ArrayList <>();
16+ new CopyOnWriteArrayList <>();
1617 public DelayedTask (int delayInMilliseconds ) {
1718 delta = delayInMilliseconds ;
1819 trigger = System .nanoTime () +
@@ -32,74 +33,69 @@ public int compareTo(Delayed arg) {
3233 return 0 ;
3334 }
3435 @ Override
35- public void run () { System .out .print (this + " " ); }
36+ public void run () {
37+ System .out .print (this + " " );
38+ }
3639 @ Override
3740 public String toString () {
38- return String . format ( "[%1$-4d]" , delta ) +
39- " Task " + id ;
41+ return
42+ String . format ( "[%d] Task %d" , delta , id ) ;
4043 }
4144 public String summary () {
42- return "(" + id + ":" + delta + ")" ;
45+ return String . format ( "(%d:%d)" , id , delta ) ;
4346 }
44- public static class EndSentinel extends DelayedTask {
45- private ExecutorService exec ;
46- public EndSentinel (int delay , ExecutorService e ) {
47- super (delay );
48- exec = e ;
49- }
47+ public static class EndTask extends DelayedTask {
48+ public EndTask (int delay ) { super (delay ); }
5049 @ Override
5150 public void run () {
52- for (DelayedTask pt : sequence ) {
53- System .out .print (pt .summary () + " " );
54- }
55- System .out .println ();
56- System .out .println (this + " Calling shutdownNow()" );
57- exec .shutdownNow ();
58- }
59- }
60- }
61-
62- class DelayedTaskConsumer implements Runnable {
63- private DelayQueue <DelayedTask > q ;
64- public DelayedTaskConsumer (DelayQueue <DelayedTask > q ) {
65- this .q = q ;
66- }
67- @ Override
68- public void run () {
69- try {
70- while (!Thread .interrupted ())
71- q .take ().run (); // Run task with current thread
72- } catch (InterruptedException e ) {
73- // Acceptable way to exit
51+ sequence .forEach (dt ->
52+ System .out .println (dt .summary () + " " ));
7453 }
75- System .out .println ("Finished DelayedTaskConsumer" );
7654 }
7755}
7856
7957public class DelayQueueDemo {
80- public static void main (String [] args ) {
81- SplittableRandom rand = new SplittableRandom (47 );
82- ExecutorService es = Executors .newCachedThreadPool ();
83- DelayQueue <DelayedTask > queue =
84- new DelayQueue <>();
85- // Fill with tasks that have random delays:
86- for (int i = 0 ; i < 20 ; i ++)
87- queue .put (new DelayedTask (rand .nextInt (5000 )));
88- // Set the stopping point
89- queue .add (new DelayedTask .EndSentinel (5000 , es ));
90- es .execute (new DelayedTaskConsumer (queue ));
58+ public static void
59+ main (String [] args ) throws Exception {
60+ DelayQueue <DelayedTask > tasks =
61+ Stream .concat (
62+ // Tasks with random delays:
63+ new Random (47 ).ints (20 , 0 , 4000 )
64+ .mapToObj (DelayedTask ::new ),
65+ // Add the summarizing task:
66+ Stream .of (
67+ new DelayedTask .EndTask (4000 )))
68+ .collect (Collectors
69+ .toCollection (DelayQueue ::new ));
70+ DelayQueue <DelayedTask > delayQueue =
71+ new DelayQueue <>(tasks );
72+ while (delayQueue .size () > 0 )
73+ delayQueue .take ().run ();
9174 }
9275}
9376/* Output:
94- [70 ] Task 10 [125 ] Task 13 [267 ] Task 19 [635 ] Task 0
95- [650 ] Task 16 [682 ] Task 17 [807 ] Task 11 [1131] Task 18
96- [1177] Task 4 [1193] Task 9 [1634] Task 15 [1656] Task 6
97- [2400] Task 12 [3479] Task 5 [3737] Task 1 [3768] Task 7
98- [3941] Task 2 [4720] Task 3 [4762] Task 14 [4948] Task 8
99- (0:635) (1:3737) (2:3941) (3:4720) (4:1177) (5:3479)
100- (6:1656) (7:3768) (8:4948) (9:1193) (10:70) (11:807)
101- (12:2400) (13:125) (14:4762) (15:1634) (16:650) (17:682)
102- (18:1131) (19:267) (20:5000)
103- [5000] Task 20 Calling shutdownNow()
104- Finished DelayedTaskConsumer
77+ [128] Task 12 [429] Task 6 [551] Task 13 [555] Task 2 [693] Task 3 [809] Task 15
78+ [961] Task 5 [1258] Task 1 [1258] Task 20 [1520] Task 19 [1861] Task 4 [1998] T
79+ ask 17 [2200] Task 8 [2207] Task 10 [2288] Task 11 [2522] Task 9 [2589] Task 14
80+ [2861] Task 18 [2868] Task 7 [3278] Task 16 (0:4000)
81+ (1:1258)
82+ (2:555)
83+ (3:693)
84+ (4:1861)
85+ (5:961)
86+ (6:429)
87+ (7:2868)
88+ (8:2200)
89+ (9:2522)
90+ (10:2207)
91+ (11:2288)
92+ (12:128)
93+ (13:551)
94+ (14:2589)
95+ (15:809)
96+ (16:3278)
97+ (17:1998)
98+ (18:2861)
99+ (19:1520)
100+ (20:1258)
105101*/
0 commit comments