diff --git a/bigtable/beam/workload-generator/README.md b/bigtable/beam/workload-generator/README.md index b30129bef01..91acc4a7a3c 100644 --- a/bigtable/beam/workload-generator/README.md +++ b/bigtable/beam/workload-generator/README.md @@ -90,7 +90,7 @@ If you would like to modify this and run it yourself you can use these commands: ``` mvn compile exec:java -Dexec.mainClass=WorkloadGenerator \ - "-Dexec.args=--bigtableInstanceId=$INSTANCE_ID =--bigtableTableId=$TABLE_ID \ + "-Dexec.args=--bigtableInstanceId=$INSTANCE_ID --bigtableTableId=$TABLE_ID \ --runner=dataflow --project=$GOOGLE_CLOUD_PROJECT \ --region=$REGION" \ --workloadRate=$WORKLOAD_RATE diff --git a/bigtable/beam/workload-generator/pom.xml b/bigtable/beam/workload-generator/pom.xml index bc0ffc3037d..8d29eefb9aa 100644 --- a/bigtable/beam/workload-generator/pom.xml +++ b/bigtable/beam/workload-generator/pom.xml @@ -87,6 +87,12 @@ org.apache.beam beam-runners-google-cloud-dataflow-java ${apache_beam.version} + + + io.grpc + grpc-netty + + org.apache.beam @@ -98,15 +104,21 @@ com.google.guava guava - 31.0.1-jre + 31.1-jre com.google.cloud.bigtable bigtable-hbase-beam - 2.2.0 + 2.4.0 + + com.google.cloud + google-cloud-dataflow + 0.6.0 + test + junit junit diff --git a/bigtable/beam/workload-generator/src/main/java/bigtable/WorkloadGenerator.java b/bigtable/beam/workload-generator/src/main/java/bigtable/WorkloadGenerator.java index 69680c33306..b11838967dd 100644 --- a/bigtable/beam/workload-generator/src/main/java/bigtable/WorkloadGenerator.java +++ b/bigtable/beam/workload-generator/src/main/java/bigtable/WorkloadGenerator.java @@ -68,6 +68,7 @@ static PipelineResult generateWorkload(BigtableWorkloadOptions options) { ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1); exec.schedule(() -> { try { + System.out.println("Cancelling job."); cancelJob(options, (DataflowPipelineJob) pipelineResult); } catch (IOException e) { e.printStackTrace(); diff --git a/bigtable/beam/workload-generator/src/test/java/bigtable/WorkloadGeneratorTest.java b/bigtable/beam/workload-generator/src/test/java/bigtable/WorkloadGeneratorTest.java index e5403760f64..67a738876e9 100644 --- a/bigtable/beam/workload-generator/src/test/java/bigtable/WorkloadGeneratorTest.java +++ b/bigtable/beam/workload-generator/src/test/java/bigtable/WorkloadGeneratorTest.java @@ -31,6 +31,10 @@ import com.google.bigtable.repackaged.com.google.protobuf.util.Timestamps; import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration; import com.google.cloud.bigtable.hbase.BigtableConfiguration; +import com.google.dataflow.v1beta3.FlexTemplatesServiceClient; +import com.google.dataflow.v1beta3.LaunchFlexTemplateParameter; +import com.google.dataflow.v1beta3.LaunchFlexTemplateRequest; +import com.google.dataflow.v1beta3.LaunchFlexTemplateResponse; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.PrintStream; @@ -132,7 +136,7 @@ public void testGenerateWorkload() { p.run().waitUntilFinish(); String output = bout.toString(); - assertThat(output.contains("Connected to table")); + assertThat(output).contains("Connected to table"); } @Test @@ -181,13 +185,46 @@ public void testPipeline() throws IOException, InterruptedException { startRequestCount = ts.getPoints(0).getValue().getInt64Value(); endRequestCount = ts.getPoints(ts.getPointsCount() - 1).getValue().getInt64Value(); } - assertThat(endRequestCount - startRequestCount > rate); + assertThat(endRequestCount - startRequestCount > rate).isTrue(); - // Stop the running job. + // Ensure the job is stopped after duration. String jobId = ((DataflowPipelineJob) pipelineResult).getJobId(); DataflowClient client = DataflowClient.create(options); Job job = client.getJob(jobId); - assertThat(job.getCurrentState().equals("JOB_STATE_CANCELLED")); + assertThat(job.getCurrentState()).matches("JOB_STATE_CANCELLED"); + } + + @Test + public void testDeployedPipeline() throws IOException, InterruptedException { + FlexTemplatesServiceClient flexTemplatesServiceClient = + FlexTemplatesServiceClient.create(); + LaunchFlexTemplateRequest request = + LaunchFlexTemplateRequest.newBuilder() + .setProjectId(projectId) + .setLaunchParameter( + LaunchFlexTemplateParameter.newBuilder() + .setContainerSpecGcsPath( + "gs://cloud-bigtable-dataflow-templates/generate-workload.json") + .setJobName("generate-workload" + UUID.randomUUID().toString().substring(0, 20)) + .putParameters("bigtableInstanceId", instanceId) + .putParameters("bigtableTableId", TABLE_ID) + .build()) + .build(); + + LaunchFlexTemplateResponse response = flexTemplatesServiceClient.launchFlexTemplate(request); + + String jobId = response.getJob().getId(); + BigtableWorkloadOptions options = PipelineOptionsFactory.create() + .as(BigtableWorkloadOptions.class); + DataflowClient client = DataflowClient.create(options); + + Thread.sleep(3 * 60 * 1000); + Job job = client.getJob(jobId); + assertThat(job.getCurrentState()).matches("JOB_STATE_RUNNING"); + + // Cancel job manually because test job never ends. + job.setRequestedState("JOB_STATE_CANCELLED"); + client.updateJob(jobId, job); } } diff --git a/bigtable/cassandra-migration-codelab/pom.xml b/bigtable/cassandra-migration-codelab/pom.xml index 9c714a17f69..6a27eb81b4e 100644 --- a/bigtable/cassandra-migration-codelab/pom.xml +++ b/bigtable/cassandra-migration-codelab/pom.xml @@ -53,5 +53,11 @@ 4.13.2 test + + com.google.truth + truth + 1.1.3 + test + \ No newline at end of file diff --git a/bigtable/cassandra-migration-codelab/src/test/java/CassandraMigrationCodelabTest.java b/bigtable/cassandra-migration-codelab/src/test/java/CassandraMigrationCodelabTest.java index 9679698e115..fe38735ff5a 100644 --- a/bigtable/cassandra-migration-codelab/src/test/java/CassandraMigrationCodelabTest.java +++ b/bigtable/cassandra-migration-codelab/src/test/java/CassandraMigrationCodelabTest.java @@ -14,7 +14,7 @@ * limitations under the License. */ -import static org.hamcrest.MatcherAssert.assertThat; +import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertNotNull; import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient; @@ -23,7 +23,6 @@ import java.io.ByteArrayOutputStream; import java.io.PrintStream; import java.util.UUID; -import org.hamcrest.CoreMatchers; import org.junit.Test; public class CassandraMigrationCodelabTest { @@ -66,7 +65,7 @@ public void testRunDoesNotFail() throws Exception { cassandraMigrationCodelab.run(); String output = bout.toString(); - assertThat(output, CoreMatchers.not(CoreMatchers.containsString("Error during"))); + assertThat(output).doesNotContainMatch("Error during"); adminClient.deleteTable(TABLE_ID); } diff --git a/bigtable/hbase/snippets/src/test/java/com/example/bigtable/ConfigureConnectionPoolTest.java b/bigtable/hbase/snippets/src/test/java/com/example/bigtable/ConfigureConnectionPoolTest.java index bbd69577ece..20cc0f53ffa 100644 --- a/bigtable/hbase/snippets/src/test/java/com/example/bigtable/ConfigureConnectionPoolTest.java +++ b/bigtable/hbase/snippets/src/test/java/com/example/bigtable/ConfigureConnectionPoolTest.java @@ -16,12 +16,11 @@ package com.example.bigtable; +import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThat; import java.io.ByteArrayOutputStream; import java.io.PrintStream; -import org.hamcrest.CoreMatchers; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; @@ -65,6 +64,6 @@ public void testConfigureConnectionPool() { ConfigureConnectionPool.configureConnectionPool(projectId, instanceId); String output = bout.toString(); - assertThat(output, CoreMatchers.containsString("Connected with pool size of 10")); + assertThat(output).contains("Connected with pool size of 10"); } } diff --git a/bigtable/hbase/snippets/src/test/java/com/example/bigtable/WritesTest.java b/bigtable/hbase/snippets/src/test/java/com/example/bigtable/WritesTest.java index 5b1a18ef97f..c1421d0cc5f 100644 --- a/bigtable/hbase/snippets/src/test/java/com/example/bigtable/WritesTest.java +++ b/bigtable/hbase/snippets/src/test/java/com/example/bigtable/WritesTest.java @@ -16,8 +16,8 @@ package com.example.bigtable; +import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThat; import com.google.cloud.bigtable.hbase.BigtableConfiguration; import java.io.ByteArrayOutputStream; @@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; -import org.hamcrest.CoreMatchers; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -93,7 +92,7 @@ public void test1_WriteSimple() { WriteSimple.writeSimple(projectId, instanceId, TABLE_ID); String output = bout.toString(); - assertThat(output, CoreMatchers.containsString("Successfully wrote row")); + assertThat(output).contains("Successfully wrote row"); } @Test @@ -101,7 +100,7 @@ public void test2_WriteBatch() { WriteBatch.writeBatch(projectId, instanceId, TABLE_ID); String output = bout.toString(); - assertThat(output, CoreMatchers.containsString("Successfully wrote 2 rows")); + assertThat(output).contains("Successfully wrote 2 rows"); } @Test @@ -109,7 +108,7 @@ public void test3_WriteConditionally() { WriteConditionally.writeConditionally(projectId, instanceId, TABLE_ID); String output = bout.toString(); - assertThat(output, CoreMatchers.containsString("Successfully updated row's os_name")); + assertThat(output).contains("Successfully updated row's os_name"); } @Test @@ -117,6 +116,6 @@ public void test4_WriteIncrement() { WriteIncrement.writeIncrement(projectId, instanceId, TABLE_ID); String output = bout.toString(); - assertThat(output, CoreMatchers.containsString("Successfully updated row")); + assertThat(output).contains("Successfully updated row"); } } diff --git a/bigtable/memorystore/src/test/java/MemcachedTest.java b/bigtable/memorystore/src/test/java/MemcachedTest.java index c40d04e9201..044307b6df9 100644 --- a/bigtable/memorystore/src/test/java/MemcachedTest.java +++ b/bigtable/memorystore/src/test/java/MemcachedTest.java @@ -15,9 +15,9 @@ */ +import static com.google.common.truth.Truth.assertThat; import static java.lang.Thread.sleep; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThat; import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient; import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest; @@ -26,8 +26,6 @@ import java.io.ByteArrayOutputStream; import java.io.PrintStream; import java.util.UUID; -import org.hamcrest.CoreMatchers; -import org.hamcrest.Matcher; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -130,17 +128,16 @@ public void testMemcached() throws InterruptedException { Memcached.main(null); String output = bout.toString(); - assertThat(output, CoreMatchers.containsString("Value fetched from Bigtable: PQ2A.190405.003")); + assertThat(output).contains("Value fetched from Bigtable: PQ2A.190405.003"); // retry (due to occasional flakiness) if we didn't yet get the result in the cache int retryCount = 0; - Matcher foundInCache = - CoreMatchers.containsString("Value fetched from cache: PQ2A.190405.003"); - while (retryCount < 5 && !foundInCache.matches(output)) { + String foundInCache = "Value fetched from cache: PQ2A.190405.003"; + while (retryCount < 5 && !output.contains(foundInCache)) { Memcached.main(null); output = bout.toString(); retryCount++; } - assertThat(output, foundInCache); + assertThat(output).contains(foundInCache); } }