Skip to content

Commit 11739be

Browse files
adutraAlexandre Dutra
authored andcommitted
JAVA-1369: Expose workload set from DSE node metadata.
There is now now a new 'workloads' set column in DSE 5.1 . This commit makes this column available in the driver metadata. Legacy column is unchanged.
1 parent 0831250 commit 11739be

12 files changed

Lines changed: 177 additions & 205 deletions

File tree

changelog/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
- [new feature] JAVA-1347: Add support for duration type.
66
- [new feature] JAVA-1248: Implement "beta" flag for native protocol v5.
77
- [new feature] JAVA-1362: Send query options flags as [int] for Protocol V5+.
8+
- [new feature] JAVA-1369: Expose workload set from DSE node metadata.
89

910

1011
### 3.1.3

driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -505,9 +505,14 @@ private static void updateInfo(Host host, Row row, Cluster.Manager cluster, bool
505505
: null;
506506
host.setListenAddress(listenAddress);
507507

508-
if (row.getColumnDefinitions().contains("workload")) {
508+
if (row.getColumnDefinitions().contains("workloads")) {
509+
Set<String> dseWorkloads = row.getSet("workloads", String.class);
510+
host.setDseWorkloads(dseWorkloads);
511+
} else if (row.getColumnDefinitions().contains("workload") && row.getString("workload") != null) {
509512
String dseWorkload = row.getString("workload");
510-
host.setDseWorkload(dseWorkload);
513+
host.setDseWorkloads(Collections.singleton(dseWorkload));
514+
} else {
515+
host.setDseWorkloads(Collections.<String>emptySet());
511516
}
512517
if (row.getColumnDefinitions().contains("graph")) {
513518
boolean isDseGraph = row.getBool("graph");
@@ -588,7 +593,7 @@ private static void refreshNodeListAndTokenMap(Connection connection, Cluster.Ma
588593
List<Set<Token>> allTokens = new ArrayList<Set<Token>>();
589594
List<String> dseVersions = new ArrayList<String>();
590595
List<Boolean> dseGraphEnabled = new ArrayList<Boolean>();
591-
List<String> dseWorkloads = new ArrayList<String>();
596+
List<Set<String>> dseWorkloads = new ArrayList<Set<String>>();
592597

593598
for (Row row : peersFuture.get()) {
594599
if (!isValidPeer(row, logInvalidPeers))
@@ -612,8 +617,12 @@ private static void refreshNodeListAndTokenMap(Connection connection, Cluster.Ma
612617
}
613618
InetAddress listenAddress = row.getColumnDefinitions().contains("listen_address") ? row.getInet("listen_address") : null;
614619
listenAddresses.add(listenAddress);
615-
String dseWorkload = row.getColumnDefinitions().contains("workload") ? row.getString("workload") : null;
616-
dseWorkloads.add(dseWorkload);
620+
if (row.getColumnDefinitions().contains("workloads"))
621+
dseWorkloads.add(row.getSet("workloads", String.class));
622+
else if (row.getColumnDefinitions().contains("workload") && row.getString("workload") != null)
623+
dseWorkloads.add(Collections.singleton(row.getString("workload")));
624+
else
625+
dseWorkloads.add(null);
617626
Boolean isDseGraph = row.getColumnDefinitions().contains("graph") ? row.getBool("graph") : null;
618627
dseGraphEnabled.add(isDseGraph);
619628
String dseVersion = row.getColumnDefinitions().contains("dse_version") ? row.getString("dse_version") : null;
@@ -648,7 +657,7 @@ private static void refreshNodeListAndTokenMap(Connection connection, Cluster.Ma
648657
if (dseVersions.get(i) != null)
649658
host.setDseVersion(dseVersions.get(i));
650659
if (dseWorkloads.get(i) != null)
651-
host.setDseWorkload(dseWorkloads.get(i));
660+
host.setDseWorkloads(dseWorkloads.get(i));
652661
if (dseGraphEnabled.get(i) != null)
653662
host.setDseGraphEnabled(dseGraphEnabled.get(i));
654663

driver-core/src/main/java/com/datastax/driver/core/Host.java

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import java.net.InetAddress;
2525
import java.net.InetSocketAddress;
26+
import java.util.Collections;
2627
import java.util.Set;
2728
import java.util.concurrent.atomic.AtomicReference;
2829
import java.util.concurrent.locks.ReentrantLock;
@@ -74,8 +75,8 @@ enum State {ADDED, DOWN, UP}
7475

7576
private volatile Set<Token> tokens;
7677

77-
private volatile String dseWorkload;
78-
private volatile boolean dseGraphEnabled;
78+
private volatile Set<String> dseWorkloads = Collections.emptySet();
79+
private volatile Boolean dseGraphEnabled;
7980
private volatile VersionNumber dseVersion;
8081

8182
// ClusterMetadata keeps one Host object per inet address and we rely on this (more precisely,
@@ -129,8 +130,8 @@ void setDseVersion(String dseVersion) {
129130
this.dseVersion = versionNumber;
130131
}
131132

132-
void setDseWorkload(String dseWorkload) {
133-
this.dseWorkload = dseWorkload;
133+
void setDseWorkloads(Set<String> dseWorkloads) {
134+
this.dseWorkloads = Collections.unmodifiableSet(dseWorkloads);
134135
}
135136

136137
void setDseGraphEnabled(boolean dseGraphEnabled) {
@@ -262,25 +263,63 @@ public VersionNumber getDseVersion() {
262263
}
263264

264265
/**
265-
* The DSE Workload the host is running.
266+
* The main DSE Workload the host is running.
266267
* <p/>
267268
* It is also possible for this information to be unavailable. In that case
268269
* this method returns {@code null}, and the caller should always be aware of this
269270
* possibility.
270271
*
271272
* @return the DSE workload the host is running.
273+
* @deprecated This method returns only the first workload reported by the host;
274+
* use {@link #getDseWorkloads()} instead.
272275
*/
276+
@Deprecated
273277
public String getDseWorkload() {
274-
return dseWorkload;
278+
return dseWorkloads.isEmpty() ? null : dseWorkloads.iterator().next();
279+
}
280+
281+
/**
282+
* The DSE Workloads the host is running.
283+
* <p/>
284+
* This is based on the "workload" or "workloads" columns in {@code system.local} and {@code system.peers}.
285+
* <p/>
286+
* Workload labels may vary depending on the DSE version in use;
287+
* e.g. DSE 5.1 may report two distinct workloads: {@code Search} and
288+
* {@code Analytics}, while DSE 5.0 would report a single
289+
* {@code SearchAnalytics} workload instead.
290+
* It is up to users to deal with such discrepancies;
291+
* the driver simply returns the workload labels as reported by DSE, without
292+
* any form of pre-processing.
293+
* <p/>
294+
* The returned set is immutable.
295+
* It is also possible for this information to be unavailable. In that case
296+
* this method returns an empty set, and the caller should always be aware of this
297+
* possibility.
298+
*
299+
* @return the DSE workloads the host is running.
300+
*/
301+
public Set<String> getDseWorkloads() {
302+
return dseWorkloads;
275303
}
276304

277305
/**
278306
* Returns whether the host is running DSE Graph.
307+
* <p/>
308+
* This is based on the "graph" column in {@code system.local} and {@code system.peers}.
279309
*
280310
* @return whether the node is running DSE Graph.
311+
* @deprecated As of DSE 5.1, users should determine whether
312+
* this host has the graph workload enabled by inspecting the contents of
313+
* {@link #getDseWorkloads()} instead. Note that this method will throw {@link UnsupportedOperationException} if the
314+
* system tables do not contain a "graph" column.
281315
*/
316+
@Deprecated
282317
public boolean isDseGraphEnabled() {
283-
return dseGraphEnabled;
318+
if (dseGraphEnabled == null) {
319+
throw new UnsupportedOperationException("No 'graph' column in system tables. Try getDseWorkloads() instead.");
320+
} else {
321+
return dseGraphEnabled;
322+
}
284323
}
285324

286325
/**

driver-core/src/test/java/com/datastax/driver/core/CCMAccess.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222

2323
public interface CCMAccess extends Closeable {
2424

25-
enum Workload {cassandra, solr, hadoop, spark, cfs, graph}
26-
2725
// Inspection methods
2826

2927
/**
@@ -221,7 +219,7 @@ enum Workload {cassandra, solr, hadoop, spark, cfs, graph}
221219
*
222220
* @param n the node number (starting from 1).
223221
*/
224-
void setWorkload(int n, Workload... workload);
222+
void setWorkload(int n, String... workload);
225223

226224

227225
// Methods blocking until nodes are up or down

driver-core/src/test/java/com/datastax/driver/core/CCMBridge.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -615,7 +615,7 @@ public void updateDSENodeConfig(int n, Map<String, Object> configs) {
615615
}
616616

617617
@Override
618-
public void setWorkload(int node, Workload... workload) {
618+
public void setWorkload(int node, String... workload) {
619619
String workloadStr = Joiner.on(",").join(workload);
620620
execute(CCM_COMMAND + " node%d setworkload %s", node, workloadStr);
621621
}
@@ -748,11 +748,11 @@ public static class Builder {
748748
private boolean start = true;
749749
private Boolean isDSE = null;
750750
private String version = getCassandraVersion();
751-
private Set<String> createOptions = new LinkedHashSet<String>(getInstallArguments());
752-
private Set<String> jvmArgs = new LinkedHashSet<String>();
751+
private final Set<String> createOptions = new LinkedHashSet<String>(getInstallArguments());
752+
private final Set<String> jvmArgs = new LinkedHashSet<String>();
753753
private final Map<String, Object> cassandraConfiguration = Maps.newLinkedHashMap();
754754
private final Map<String, Object> dseConfiguration = Maps.newLinkedHashMap();
755-
private Map<Integer, Workload[]> workloads = new HashMap<Integer, Workload[]>();
755+
private final Map<Integer, String[]> workloads = new HashMap<Integer, String[]>();
756756

757757
private Builder() {
758758
cassandraConfiguration.put("start_rpc", false);
@@ -893,7 +893,7 @@ public Builder withBinaryPort(int port) {
893893
* @param workload The workload(s) (e.g. solr, spark, hadoop)
894894
* @return This builder
895895
*/
896-
public Builder withWorkload(int node, Workload... workload) {
896+
public Builder withWorkload(int node, String... workload) {
897897
this.workloads.put(node, workload);
898898
return this;
899899
}
@@ -933,7 +933,7 @@ public void run() {
933933
if (!dseConfiguration.isEmpty())
934934
ccm.updateDSEConfig(dseConfiguration);
935935
}
936-
for (Map.Entry<Integer, Workload[]> entry : workloads.entrySet()) {
936+
for (Map.Entry<Integer, String[]> entry : workloads.entrySet()) {
937937
ccm.setWorkload(entry.getKey(), entry.getValue());
938938
}
939939
if (start)

driver-core/src/test/java/com/datastax/driver/core/CCMCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ public void updateDSENodeConfig(int n, Map<String, Object> configs) {
201201
}
202202

203203
@Override
204-
public void setWorkload(int n, Workload... workload) {
204+
public void setWorkload(int n, String... workload) {
205205
ccm.setWorkload(n, workload);
206206
}
207207

driver-core/src/test/java/com/datastax/driver/core/CCMTestsSupport.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package com.datastax.driver.core;
1717

18-
import com.datastax.driver.core.CCMAccess.Workload;
1918
import com.datastax.driver.core.CreateCCM.TestMode;
2019
import com.datastax.driver.core.exceptions.InvalidQueryException;
2120
import com.google.common.base.Throwables;
@@ -223,7 +222,7 @@ public void updateDSENodeConfig(int n, Map<String, Object> configs) {
223222
}
224223

225224
@Override
226-
public void setWorkload(int node, Workload... workload) {
225+
public void setWorkload(int node, String... workload) {
227226
throw new UnsupportedOperationException("This CCM cluster is read-only");
228227
}
229228

@@ -361,11 +360,11 @@ private Set<String> startOptions() {
361360
return args;
362361
}
363362

364-
private List<Workload[]> workloads() {
363+
private List<String[]> workloads() {
365364
int total = 0;
366365
for (int perDc : numberOfNodes())
367366
total += perDc;
368-
List<Workload[]> workloads = new ArrayList<Workload[]>(Collections.<Workload[]>nCopies(total, null));
367+
List<String[]> workloads = new ArrayList<String[]>(Collections.<String[]>nCopies(total, null));
369368
for (int i = annotations.size() - 1; i >= 0; i--) {
370369
CCMConfig ann = annotations.get(i);
371370
CCMWorkload[] annWorkloads = ann.workloads();
@@ -454,9 +453,9 @@ private CCMBridge.Builder ccmBuilder(Object testInstance) throws Exception {
454453
for (String arg : jvmArgs()) {
455454
ccmBuilder.withJvmArgs(arg);
456455
}
457-
List<Workload[]> workloads = workloads();
456+
List<String[]> workloads = workloads();
458457
for (int i = 0; i < workloads.size(); i++) {
459-
Workload[] workload = workloads.get(i);
458+
String[] workload = workloads.get(i);
460459
if (workload != null)
461460
ccmBuilder.withWorkload(i + 1, workload);
462461
}

driver-core/src/test/java/com/datastax/driver/core/CCMWorkload.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@
3030
/**
3131
* The workloads to assign to a specific node.
3232
*
33-
* @return The workloads to assign to a specifc node.
33+
* @return The workloads to assign to a specific node.
3434
*/
35-
CCMAccess.Workload[] value() default {};
35+
String[] value() default {};
3636

3737
}

driver-core/src/test/java/com/datastax/driver/core/DseCCMClusterTest.java

Lines changed: 0 additions & 90 deletions
This file was deleted.

driver-core/src/test/java/com/datastax/driver/core/HostAssert.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.assertj.core.api.AbstractAssert;
2222

2323
import java.net.InetAddress;
24+
import java.util.Arrays;
2425
import java.util.concurrent.Callable;
2526
import java.util.concurrent.CountDownLatch;
2627
import java.util.concurrent.TimeUnit;
@@ -141,13 +142,13 @@ public void onDown(Host host) {
141142
return this;
142143
}
143144

144-
public HostAssert hasWorkload(String workload) {
145-
assertThat(actual.getDseWorkload()).isNotNull().isEqualTo(workload);
145+
public HostAssert hasWorkload(String... workloads) {
146+
assertThat(actual.getDseWorkloads()).isNotNull().containsAll(Arrays.asList(workloads));
146147
return this;
147148
}
148149

149150
public HostAssert hasNoWorkload() {
150-
assertThat(actual.getDseWorkload()).isNull();
151+
assertThat(actual.getDseWorkloads()).isEmpty();
151152
return this;
152153
}
153154

0 commit comments

Comments
 (0)