diff --git a/bigtable/beam/workload-generator/README.md b/bigtable/beam/workload-generator/README.md
index b30129bef01..91acc4a7a3c 100644
--- a/bigtable/beam/workload-generator/README.md
+++ b/bigtable/beam/workload-generator/README.md
@@ -90,7 +90,7 @@ If you would like to modify this and run it yourself you can use these commands:
```
mvn compile exec:java -Dexec.mainClass=WorkloadGenerator \
- "-Dexec.args=--bigtableInstanceId=$INSTANCE_ID =--bigtableTableId=$TABLE_ID \
+ "-Dexec.args=--bigtableInstanceId=$INSTANCE_ID --bigtableTableId=$TABLE_ID \
--runner=dataflow --project=$GOOGLE_CLOUD_PROJECT \
--region=$REGION" \
--workloadRate=$WORKLOAD_RATE
diff --git a/bigtable/beam/workload-generator/pom.xml b/bigtable/beam/workload-generator/pom.xml
index bc0ffc3037d..8d29eefb9aa 100644
--- a/bigtable/beam/workload-generator/pom.xml
+++ b/bigtable/beam/workload-generator/pom.xml
@@ -87,6 +87,12 @@
org.apache.beam
beam-runners-google-cloud-dataflow-java
${apache_beam.version}
+
+
+ io.grpc
+ grpc-netty
+
+
org.apache.beam
@@ -98,15 +104,21 @@
com.google.guava
guava
- 31.0.1-jre
+ 31.1-jre
com.google.cloud.bigtable
bigtable-hbase-beam
- 2.2.0
+ 2.4.0
+
+ com.google.cloud
+ google-cloud-dataflow
+ 0.6.0
+ test
+
junit
junit
diff --git a/bigtable/beam/workload-generator/src/main/java/bigtable/WorkloadGenerator.java b/bigtable/beam/workload-generator/src/main/java/bigtable/WorkloadGenerator.java
index 69680c33306..b11838967dd 100644
--- a/bigtable/beam/workload-generator/src/main/java/bigtable/WorkloadGenerator.java
+++ b/bigtable/beam/workload-generator/src/main/java/bigtable/WorkloadGenerator.java
@@ -68,6 +68,7 @@ static PipelineResult generateWorkload(BigtableWorkloadOptions options) {
ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1);
exec.schedule(() -> {
try {
+ System.out.println("Cancelling job.");
cancelJob(options, (DataflowPipelineJob) pipelineResult);
} catch (IOException e) {
e.printStackTrace();
diff --git a/bigtable/beam/workload-generator/src/test/java/bigtable/WorkloadGeneratorTest.java b/bigtable/beam/workload-generator/src/test/java/bigtable/WorkloadGeneratorTest.java
index e5403760f64..67a738876e9 100644
--- a/bigtable/beam/workload-generator/src/test/java/bigtable/WorkloadGeneratorTest.java
+++ b/bigtable/beam/workload-generator/src/test/java/bigtable/WorkloadGeneratorTest.java
@@ -31,6 +31,10 @@
import com.google.bigtable.repackaged.com.google.protobuf.util.Timestamps;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableConfiguration;
+import com.google.dataflow.v1beta3.FlexTemplatesServiceClient;
+import com.google.dataflow.v1beta3.LaunchFlexTemplateParameter;
+import com.google.dataflow.v1beta3.LaunchFlexTemplateRequest;
+import com.google.dataflow.v1beta3.LaunchFlexTemplateResponse;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
@@ -132,7 +136,7 @@ public void testGenerateWorkload() {
p.run().waitUntilFinish();
String output = bout.toString();
- assertThat(output.contains("Connected to table"));
+ assertThat(output).contains("Connected to table");
}
@Test
@@ -181,13 +185,46 @@ public void testPipeline() throws IOException, InterruptedException {
startRequestCount = ts.getPoints(0).getValue().getInt64Value();
endRequestCount = ts.getPoints(ts.getPointsCount() - 1).getValue().getInt64Value();
}
- assertThat(endRequestCount - startRequestCount > rate);
+ assertThat(endRequestCount - startRequestCount > rate).isTrue();
- // Stop the running job.
+ // Ensure the job is stopped after duration.
String jobId = ((DataflowPipelineJob) pipelineResult).getJobId();
DataflowClient client = DataflowClient.create(options);
Job job = client.getJob(jobId);
- assertThat(job.getCurrentState().equals("JOB_STATE_CANCELLED"));
+ assertThat(job.getCurrentState()).matches("JOB_STATE_CANCELLED");
+ }
+
+ @Test
+ public void testDeployedPipeline() throws IOException, InterruptedException {
+ FlexTemplatesServiceClient flexTemplatesServiceClient =
+ FlexTemplatesServiceClient.create();
+ LaunchFlexTemplateRequest request =
+ LaunchFlexTemplateRequest.newBuilder()
+ .setProjectId(projectId)
+ .setLaunchParameter(
+ LaunchFlexTemplateParameter.newBuilder()
+ .setContainerSpecGcsPath(
+ "gs://cloud-bigtable-dataflow-templates/generate-workload.json")
+ .setJobName("generate-workload" + UUID.randomUUID().toString().substring(0, 20))
+ .putParameters("bigtableInstanceId", instanceId)
+ .putParameters("bigtableTableId", TABLE_ID)
+ .build())
+ .build();
+
+ LaunchFlexTemplateResponse response = flexTemplatesServiceClient.launchFlexTemplate(request);
+
+ String jobId = response.getJob().getId();
+ BigtableWorkloadOptions options = PipelineOptionsFactory.create()
+ .as(BigtableWorkloadOptions.class);
+ DataflowClient client = DataflowClient.create(options);
+
+ Thread.sleep(3 * 60 * 1000);
+ Job job = client.getJob(jobId);
+ assertThat(job.getCurrentState()).matches("JOB_STATE_RUNNING");
+
+ // Cancel job manually because test job never ends.
+ job.setRequestedState("JOB_STATE_CANCELLED");
+ client.updateJob(jobId, job);
}
}
diff --git a/bigtable/cassandra-migration-codelab/pom.xml b/bigtable/cassandra-migration-codelab/pom.xml
index 9c714a17f69..6a27eb81b4e 100644
--- a/bigtable/cassandra-migration-codelab/pom.xml
+++ b/bigtable/cassandra-migration-codelab/pom.xml
@@ -53,5 +53,11 @@
4.13.2
test
+
+ com.google.truth
+ truth
+ 1.1.3
+ test
+
\ No newline at end of file
diff --git a/bigtable/cassandra-migration-codelab/src/test/java/CassandraMigrationCodelabTest.java b/bigtable/cassandra-migration-codelab/src/test/java/CassandraMigrationCodelabTest.java
index 9679698e115..fe38735ff5a 100644
--- a/bigtable/cassandra-migration-codelab/src/test/java/CassandraMigrationCodelabTest.java
+++ b/bigtable/cassandra-migration-codelab/src/test/java/CassandraMigrationCodelabTest.java
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-import static org.hamcrest.MatcherAssert.assertThat;
+import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertNotNull;
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
@@ -23,7 +23,6 @@
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.UUID;
-import org.hamcrest.CoreMatchers;
import org.junit.Test;
public class CassandraMigrationCodelabTest {
@@ -66,7 +65,7 @@ public void testRunDoesNotFail() throws Exception {
cassandraMigrationCodelab.run();
String output = bout.toString();
- assertThat(output, CoreMatchers.not(CoreMatchers.containsString("Error during")));
+ assertThat(output).doesNotContainMatch("Error during");
adminClient.deleteTable(TABLE_ID);
}
diff --git a/bigtable/hbase/snippets/src/test/java/com/example/bigtable/ConfigureConnectionPoolTest.java b/bigtable/hbase/snippets/src/test/java/com/example/bigtable/ConfigureConnectionPoolTest.java
index bbd69577ece..20cc0f53ffa 100644
--- a/bigtable/hbase/snippets/src/test/java/com/example/bigtable/ConfigureConnectionPoolTest.java
+++ b/bigtable/hbase/snippets/src/test/java/com/example/bigtable/ConfigureConnectionPoolTest.java
@@ -16,12 +16,11 @@
package com.example.bigtable;
+import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
-import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -65,6 +64,6 @@ public void testConfigureConnectionPool() {
ConfigureConnectionPool.configureConnectionPool(projectId, instanceId);
String output = bout.toString();
- assertThat(output, CoreMatchers.containsString("Connected with pool size of 10"));
+ assertThat(output).contains("Connected with pool size of 10");
}
}
diff --git a/bigtable/hbase/snippets/src/test/java/com/example/bigtable/WritesTest.java b/bigtable/hbase/snippets/src/test/java/com/example/bigtable/WritesTest.java
index 5b1a18ef97f..c1421d0cc5f 100644
--- a/bigtable/hbase/snippets/src/test/java/com/example/bigtable/WritesTest.java
+++ b/bigtable/hbase/snippets/src/test/java/com/example/bigtable/WritesTest.java
@@ -16,8 +16,8 @@
package com.example.bigtable;
+import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
import com.google.cloud.bigtable.hbase.BigtableConfiguration;
import java.io.ByteArrayOutputStream;
@@ -30,7 +30,6 @@
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
-import org.hamcrest.CoreMatchers;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -93,7 +92,7 @@ public void test1_WriteSimple() {
WriteSimple.writeSimple(projectId, instanceId, TABLE_ID);
String output = bout.toString();
- assertThat(output, CoreMatchers.containsString("Successfully wrote row"));
+ assertThat(output).contains("Successfully wrote row");
}
@Test
@@ -101,7 +100,7 @@ public void test2_WriteBatch() {
WriteBatch.writeBatch(projectId, instanceId, TABLE_ID);
String output = bout.toString();
- assertThat(output, CoreMatchers.containsString("Successfully wrote 2 rows"));
+ assertThat(output).contains("Successfully wrote 2 rows");
}
@Test
@@ -109,7 +108,7 @@ public void test3_WriteConditionally() {
WriteConditionally.writeConditionally(projectId, instanceId, TABLE_ID);
String output = bout.toString();
- assertThat(output, CoreMatchers.containsString("Successfully updated row's os_name"));
+ assertThat(output).contains("Successfully updated row's os_name");
}
@Test
@@ -117,6 +116,6 @@ public void test4_WriteIncrement() {
WriteIncrement.writeIncrement(projectId, instanceId, TABLE_ID);
String output = bout.toString();
- assertThat(output, CoreMatchers.containsString("Successfully updated row"));
+ assertThat(output).contains("Successfully updated row");
}
}
diff --git a/bigtable/memorystore/src/test/java/MemcachedTest.java b/bigtable/memorystore/src/test/java/MemcachedTest.java
index c40d04e9201..044307b6df9 100644
--- a/bigtable/memorystore/src/test/java/MemcachedTest.java
+++ b/bigtable/memorystore/src/test/java/MemcachedTest.java
@@ -15,9 +15,9 @@
*/
+import static com.google.common.truth.Truth.assertThat;
import static java.lang.Thread.sleep;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest;
@@ -26,8 +26,6 @@
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.UUID;
-import org.hamcrest.CoreMatchers;
-import org.hamcrest.Matcher;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -130,17 +128,16 @@ public void testMemcached() throws InterruptedException {
Memcached.main(null);
String output = bout.toString();
- assertThat(output, CoreMatchers.containsString("Value fetched from Bigtable: PQ2A.190405.003"));
+ assertThat(output).contains("Value fetched from Bigtable: PQ2A.190405.003");
// retry (due to occasional flakiness) if we didn't yet get the result in the cache
int retryCount = 0;
- Matcher foundInCache =
- CoreMatchers.containsString("Value fetched from cache: PQ2A.190405.003");
- while (retryCount < 5 && !foundInCache.matches(output)) {
+ String foundInCache = "Value fetched from cache: PQ2A.190405.003";
+ while (retryCount < 5 && !output.contains(foundInCache)) {
Memcached.main(null);
output = bout.toString();
retryCount++;
}
- assertThat(output, foundInCache);
+ assertThat(output).contains(foundInCache);
}
}