Skip to content

Commit 8747877

Browse files
authored
Merge pull request #47 from synadia-io/scheduling-example-and-improve
Improve scheduling management and add examples
2 parents 98f2926 + 7ad9286 commit 8747877

13 files changed

Lines changed: 961 additions & 74 deletions

schedule-message/src/examples/java/io/synadia/examples/ScheduleBasics.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,14 @@
55

66
import io.nats.client.*;
77
import io.nats.client.api.StorageType;
8-
import io.nats.client.api.StreamInfo;
98
import io.nats.client.support.DateTimeUtils;
109
import io.synadia.sm.ScheduleManagement;
1110
import io.synadia.sm.ScheduledMessageBuilder;
1211

1312
import java.util.concurrent.CountDownLatch;
1413
import java.util.concurrent.TimeUnit;
1514

16-
import static io.synadia.examples.ScheduleUtils.report;
15+
import static io.synadia.examples.ScheduleExampleUtils.report;
1716

1817
/**
1918
* Example: build and publish a few scheduled messages using
@@ -57,50 +56,53 @@ public static void main(String[] args) {
5756
try { jsm.deleteStream(STREAM); } catch (Exception ignore) {}
5857

5958
// Use the utility to properly create a schedulable stream
60-
StreamInfo si = ScheduleManagement.createSchedulableStream(jsm, STREAM, StorageType.Memory, STREAM_SUBJECTS);
61-
report("Created stream", si.getConfiguration());
59+
ScheduleManagement.createSchedulableStream(jsm, STREAM, StorageType.Memory, STREAM_SUBJECTS);
6260

6361
CountDownLatch latch = new CountDownLatch(4);
6462
Dispatcher d = connection.createDispatcher();
6563

6664
// subscribe to the subject that receives the schedule message
6765
js.subscribe(SCHEDULES, d, m -> {
68-
report("SCHEDULED (received)", m);
66+
report("MONITORING via '" + SCHEDULES + "'", m);
6967
m.ack();
7068
}, false);
7169

7270
// subscribe to the target subject
7371
js.subscribe(TARGETS, d, m -> {
74-
report("TARGETED (received)", m);
72+
report("TARGETED via '" + TARGETS + "'", m);
7573
m.ack();
7674
latch.countDown();
7775
}, false);
7876

79-
report("SCHEDULE-NOW (publishing)");
77+
report("SCHEDULING " + SCHEDULE_PREFIX + "now");
8078
new ScheduledMessageBuilder()
8179
.scheduleSubject(SCHEDULE_PREFIX + "now")
8280
.targetSubject(TARGET_PREFIX + "now")
8381
.scheduleImmediate()
8482
.data("Schedule-Now")
8583
.scheduleMessage(js);
8684

87-
report("SCHEDULE-AT (publishing)");
85+
report("SCHEDULING " + SCHEDULE_PREFIX + "at");
8886
new ScheduledMessageBuilder()
8987
.scheduleSubject(SCHEDULE_PREFIX + "at")
9088
.targetSubject(TARGET_PREFIX + "at")
9189
.scheduleAt(DateTimeUtils.gmtNow().plusSeconds(5))
9290
.data("Scheduled-At")
9391
.scheduleMessage(js);
9492

95-
report("SCHEDULE-EVERY (publishing)");
93+
report("SCHEDULING " + SCHEDULE_PREFIX + "every");
9694
new ScheduledMessageBuilder()
97-
.scheduleSubject(SCHEDULE_PREFIX + "at")
98-
.targetSubject(TARGET_PREFIX + "at")
95+
.scheduleSubject(SCHEDULE_PREFIX + "every")
96+
.targetSubject(TARGET_PREFIX + "every")
9997
.scheduleEvery(1, TimeUnit.SECONDS)
10098
.data("Every Second")
10199
.scheduleMessage(js);
102100

103101
latch.await();
102+
103+
// The "every" schedule keeps firing until it is removed.
104+
report("CANCEL " + SCHEDULE_PREFIX + "every",
105+
ScheduleManagement.cancelSchedule(jsm, SCHEDULE_PREFIX + "every", STREAM));
104106
}
105107
}
106108
catch (Exception e) {

schedule-message/src/examples/java/io/synadia/examples/ScheduleBasicsAlternate.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@
1313
import java.util.concurrent.CountDownLatch;
1414
import java.util.concurrent.TimeUnit;
1515

16-
import static io.synadia.examples.ScheduleUtils.report;
16+
import static io.synadia.examples.ScheduleExampleUtils.report;
1717

1818
/**
1919
* Example: same scenario as {@link ScheduleBasics}, but built using
2020
* {@link io.synadia.sm.ScheduledMessageBuilder#build()} and then published
2121
* via {@link io.nats.client.JetStream#publish(io.nats.client.Message)}.
22+
* There is really no reason to do this unless you specifically want to log the
23+
* actual message.
2224
*/
2325
public class ScheduleBasicsAlternate {
2426

@@ -66,13 +68,13 @@ public static void main(String[] args) {
6668

6769
// subscribe to the subject that receives the schedule message
6870
js.subscribe(SCHEDULES, d, m -> {
69-
report("SCHEDULED (received)", m);
71+
report("MONITORING via '" + SCHEDULES + "'", m);
7072
m.ack();
7173
}, false);
7274

7375
// subscribe to the target subject
7476
js.subscribe(TARGETS, d, m -> {
75-
report("TARGETED (received)", m);
77+
report("TARGETED via '" + TARGETS + "'", m);
7678
m.ack();
7779
latch.countDown();
7880
}, false);
@@ -83,7 +85,7 @@ public static void main(String[] args) {
8385
.scheduleImmediate()
8486
.data("Schedule-Now")
8587
.build();
86-
report("SCHEDULE-NOW (publishing)", m);
88+
report("SCHEDULING " + SCHEDULE_PREFIX + "now", m);
8789
js.publish(m);
8890

8991
m = new ScheduledMessageBuilder()
@@ -92,19 +94,23 @@ public static void main(String[] args) {
9294
.scheduleAt(DateTimeUtils.gmtNow().plusSeconds(5))
9395
.data("Scheduled-At")
9496
.build();
95-
report("SCHEDULE-AT (publishing)", m);
97+
report("SCHEDULING " + SCHEDULE_PREFIX + "at", m);
9698
js.publish(m);
9799

98100
m = new ScheduledMessageBuilder()
99-
.scheduleSubject(SCHEDULE_PREFIX + "at")
100-
.targetSubject(TARGET_PREFIX + "at")
101+
.scheduleSubject(SCHEDULE_PREFIX + "every")
102+
.targetSubject(TARGET_PREFIX + "every")
101103
.scheduleEvery(1, TimeUnit.SECONDS)
102104
.data("Every Second")
103105
.build();
104-
report("SCHEDULE-EVERY (publishing)", m);
106+
report("SCHEDULING " + SCHEDULE_PREFIX + "every", m);
105107
js.publish(m);
106108

107109
latch.await();
110+
111+
// The "every" schedule keeps firing until it is removed.
112+
report("CANCEL " + SCHEDULE_PREFIX + "every",
113+
ScheduleManagement.cancelSchedule(jsm, SCHEDULE_PREFIX + "every", STREAM));
108114
}
109115
}
110116
catch (Exception e) {
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
// Copyright (c) 2025-2026 Synadia Communications Inc. All Rights Reserved.
2+
// See LICENSE and NOTICE file for details.
3+
4+
package io.synadia.examples;
5+
6+
import io.nats.client.*;
7+
import io.nats.client.api.StorageType;
8+
import io.synadia.sm.ScheduleManagement;
9+
import io.synadia.sm.ScheduleManagement.Result;
10+
import io.synadia.sm.ScheduledMessageBuilder;
11+
12+
import java.time.Duration;
13+
14+
import static io.synadia.examples.ScheduleExampleUtils.report;
15+
16+
/**
17+
* Example: stop a running schedule with each
18+
* {@link ScheduleManagement#cancelSchedule cancelSchedule} overload.
19+
* <p>
20+
* <ul>
21+
* <li>by stream + sequence — the lowest-level call</li>
22+
* <li>by stream + subject — looks up the sequence on a known stream</li>
23+
* <li>by subject only — the helper locates the stream too</li>
24+
* </ul>
25+
* The example also shows a {@link Result#NOT_FOUND} outcome by cancelling a
26+
* subject that no longer has a schedule.
27+
*/
28+
public class ScheduleCancel {
29+
30+
/** Stream name used by this example. */
31+
public static final String STREAM = "schedules-enabled";
32+
33+
/** Prefix for all schedule subjects in this example. */
34+
public static final String SCHEDULE_PREFIX = "schedule.";
35+
36+
/** Prefix for all target subjects in this example. */
37+
public static final String TARGET_PREFIX = "target.";
38+
39+
private static final String SCHEDULES = SCHEDULE_PREFIX + ">";
40+
private static final String TARGETS = TARGET_PREFIX + "*";
41+
42+
/** Subject patterns the example stream accepts. */
43+
public static final String[] STREAM_SUBJECTS = new String[]{SCHEDULES, TARGETS};
44+
45+
private ScheduleCancel() {}
46+
47+
/**
48+
* Example entry point.
49+
* @param args ignored
50+
*/
51+
public static void main(String[] args) {
52+
try {
53+
Options options = new Options.Builder()
54+
.server("nats://localhost:4222")
55+
.errorListener(new ErrorListener() {})
56+
.build();
57+
58+
try (Connection connection = Nats.connect(options)) {
59+
JetStreamManagement jsm = connection.jetStreamManagement();
60+
JetStream js = connection.jetStream();
61+
62+
// delete the stream in case it existed, just for a fresh example
63+
try { jsm.deleteStream(STREAM); } catch (Exception ignore) {}
64+
65+
ScheduleManagement.createSchedulableStream(jsm, STREAM, StorageType.Memory, STREAM_SUBJECTS);
66+
67+
String bySeqSubject = SCHEDULE_PREFIX + "by-seq";
68+
String bySubjectSubject = SCHEDULE_PREFIX + "by-subject";
69+
String byLookupSubject = SCHEDULE_PREFIX + "by-lookup";
70+
71+
// Schedule each one an hour out so it can't fire while the example runs.
72+
long bySeq = new ScheduledMessageBuilder()
73+
.scheduleSubject(bySeqSubject)
74+
.targetSubject(TARGET_PREFIX + "by-seq")
75+
.scheduleIn(Duration.ofHours(1))
76+
.data("By-Seq")
77+
.scheduleMessage(js);
78+
report("SCHEDULED " + bySeqSubject + " at sequence " + bySeq);
79+
80+
new ScheduledMessageBuilder()
81+
.scheduleSubject(bySubjectSubject)
82+
.targetSubject(TARGET_PREFIX + "by-subject")
83+
.scheduleIn(Duration.ofHours(1))
84+
.data("By-Subject")
85+
.scheduleMessage(js);
86+
report("SCHEDULED " + bySubjectSubject);
87+
88+
new ScheduledMessageBuilder()
89+
.scheduleSubject(byLookupSubject)
90+
.targetSubject(TARGET_PREFIX + "by-lookup")
91+
.scheduleIn(Duration.ofHours(1))
92+
.data("By-Lookup")
93+
.scheduleMessage(js);
94+
report("SCHEDULED " + byLookupSubject);
95+
96+
// 1) Sequence-based cancel.
97+
Result r1 = ScheduleManagement.cancelSchedule(jsm, STREAM, bySeq);
98+
report("CANCEL by sequence", r1);
99+
100+
// 2) Subject + stream cancel.
101+
Result r2 = ScheduleManagement.cancelSchedule(jsm, bySubjectSubject, STREAM);
102+
report("CANCEL by subject + stream", r2);
103+
104+
// 3) Subject-only cancel; the helper locates the stream.
105+
Result r3 = ScheduleManagement.cancelSchedule(jsm, byLookupSubject);
106+
report("CANCEL by subject (auto-find)", r3);
107+
108+
// 4) Cancelling something that isn't there returns NOT_FOUND.
109+
Result r4 = ScheduleManagement.cancelSchedule(jsm, bySeqSubject, STREAM);
110+
report("CANCEL already-cancelled", r4);
111+
}
112+
}
113+
catch (Exception e) {
114+
e.printStackTrace();
115+
}
116+
}
117+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// Copyright (c) 2025-2026 Synadia Communications Inc. All Rights Reserved.
2+
// See LICENSE and NOTICE file for details.
3+
4+
package io.synadia.examples;
5+
6+
import io.nats.client.*;
7+
import io.nats.client.api.StorageType;
8+
import io.synadia.sm.ScheduleManagement;
9+
import io.synadia.sm.ScheduledMessageBuilder;
10+
11+
import java.util.concurrent.CountDownLatch;
12+
13+
import static io.synadia.examples.ScheduleExampleUtils.report;
14+
15+
/**
16+
* Example: schedule using cron expressions, including the
17+
* {@link io.synadia.sm.ScheduledMessageBuilder#scheduleCron(String, String)}
18+
* variant that takes an IANA time zone.
19+
* <p>
20+
* NATS schedules use a six-field cron form (second minute hour day month
21+
* day-of-week) per ADR-51. The expressions below fire on short intervals
22+
* so the example completes quickly; the time-zone-bound expression behaves
23+
* identically here because it does not pin a time of day, but the call shape
24+
* is the same one you would use for {@code "0 30 9 * * *"} ("9:30 every day
25+
* in New York").
26+
*/
27+
public class ScheduleCron {
28+
29+
/** Stream name used by this example. */
30+
public static final String STREAM = "schedules-enabled";
31+
32+
/** Prefix for all schedule subjects in this example. */
33+
public static final String SCHEDULE_PREFIX = "schedule.";
34+
35+
/** Prefix for all target subjects in this example. */
36+
public static final String TARGET_PREFIX = "target.";
37+
38+
private static final String SCHEDULES = SCHEDULE_PREFIX + ">";
39+
private static final String TARGETS = TARGET_PREFIX + "*";
40+
41+
/** Subject patterns the example stream accepts. */
42+
public static final String[] STREAM_SUBJECTS = new String[]{SCHEDULES, TARGETS};
43+
44+
private ScheduleCron() {}
45+
46+
/**
47+
* Example entry point.
48+
* @param args ignored
49+
*/
50+
public static void main(String[] args) {
51+
try {
52+
Options options = new Options.Builder()
53+
.server("nats://localhost:4222")
54+
.errorListener(new ErrorListener() {})
55+
.build();
56+
57+
try (Connection connection = Nats.connect(options)) {
58+
JetStreamManagement jsm = connection.jetStreamManagement();
59+
JetStream js = connection.jetStream();
60+
61+
// delete the stream in case it existed, just for a fresh example
62+
try { jsm.deleteStream(STREAM); } catch (Exception ignore) {}
63+
64+
ScheduleManagement.createSchedulableStream(jsm, STREAM, StorageType.Memory, STREAM_SUBJECTS);
65+
66+
CountDownLatch latch = new CountDownLatch(4);
67+
Dispatcher d = connection.createDispatcher();
68+
69+
js.subscribe(TARGETS, d, m -> {
70+
report("TARGETED via '" + TARGETS + "'", m);
71+
m.ack();
72+
latch.countDown();
73+
}, false);
74+
75+
String cronSubject = SCHEDULE_PREFIX + "cron";
76+
String cronTzSubject = SCHEDULE_PREFIX + "cron-tz";
77+
78+
// Six-field cron: every two seconds.
79+
report("SCHEDULING " + cronSubject + " with cron '*/2 * * * * *'");
80+
new ScheduledMessageBuilder()
81+
.scheduleSubject(cronSubject)
82+
.targetSubject(TARGET_PREFIX + "cron")
83+
.scheduleCron("*/2 * * * * *")
84+
.data("Cron-Every-2s")
85+
.scheduleMessage(js);
86+
87+
// Same expression, evaluated in a specific IANA time zone.
88+
report("SCHEDULING " + cronTzSubject + " with cron '*/3 * * * * *' (America/New_York)");
89+
new ScheduledMessageBuilder()
90+
.scheduleSubject(cronTzSubject)
91+
.targetSubject(TARGET_PREFIX + "cron-tz")
92+
.scheduleCron("*/3 * * * * *", "America/New_York")
93+
.data("Cron-Every-3s-NY")
94+
.scheduleMessage(js);
95+
96+
latch.await();
97+
98+
// Cron schedules keep firing until they are removed.
99+
report("CANCEL " + cronSubject, ScheduleManagement.cancelSchedule(jsm, cronSubject, STREAM));
100+
report("CANCEL " + cronTzSubject, ScheduleManagement.cancelSchedule(jsm, cronTzSubject, STREAM));
101+
}
102+
}
103+
catch (Exception e) {
104+
e.printStackTrace();
105+
}
106+
}
107+
}

0 commit comments

Comments
 (0)