Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bigtable/beam/workload-generator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ If you would like to modify this and run it yourself you can use these commands:

```
mvn compile exec:java -Dexec.mainClass=WorkloadGenerator \
"-Dexec.args=--bigtableInstanceId=$INSTANCE_ID =--bigtableTableId=$TABLE_ID \
"-Dexec.args=--bigtableInstanceId=$INSTANCE_ID --bigtableTableId=$TABLE_ID \
--runner=dataflow --project=$GOOGLE_CLOUD_PROJECT \
--region=$REGION" \
--workloadRate=$WORKLOAD_RATE
Expand Down
16 changes: 14 additions & 2 deletions bigtable/beam/workload-generator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${apache_beam.version}</version>
<exclusions>
<exclusion>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
Expand All @@ -98,15 +104,21 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.0.1-jre</version>
<version>31.1-jre</version>
</dependency>

<dependency>
<groupId>com.google.cloud.bigtable</groupId>
<artifactId>bigtable-hbase-beam</artifactId>
<version>2.2.0</version>
<version>2.4.0</version>
</dependency>

<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-dataflow</artifactId>
<version>0.6.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ static PipelineResult generateWorkload(BigtableWorkloadOptions options) {
ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1);
exec.schedule(() -> {
try {
System.out.println("Cancelling job.");
cancelJob(options, (DataflowPipelineJob) pipelineResult);
} catch (IOException e) {
e.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
import com.google.bigtable.repackaged.com.google.protobuf.util.Timestamps;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableConfiguration;
import com.google.dataflow.v1beta3.FlexTemplatesServiceClient;
import com.google.dataflow.v1beta3.LaunchFlexTemplateParameter;
import com.google.dataflow.v1beta3.LaunchFlexTemplateRequest;
import com.google.dataflow.v1beta3.LaunchFlexTemplateResponse;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
Expand Down Expand Up @@ -132,7 +136,7 @@ public void testGenerateWorkload() {
p.run().waitUntilFinish();

String output = bout.toString();
assertThat(output.contains("Connected to table"));
assertThat(output).contains("Connected to table");
}

@Test
Expand Down Expand Up @@ -181,13 +185,46 @@ public void testPipeline() throws IOException, InterruptedException {
startRequestCount = ts.getPoints(0).getValue().getInt64Value();
endRequestCount = ts.getPoints(ts.getPointsCount() - 1).getValue().getInt64Value();
}
assertThat(endRequestCount - startRequestCount > rate);
assertThat(endRequestCount - startRequestCount > rate).isTrue();

// Stop the running job.
// Ensure the job is stopped after duration.
String jobId = ((DataflowPipelineJob) pipelineResult).getJobId();
DataflowClient client = DataflowClient.create(options);
Job job = client.getJob(jobId);

assertThat(job.getCurrentState().equals("JOB_STATE_CANCELLED"));
assertThat(job.getCurrentState()).matches("JOB_STATE_CANCELLED");
}

@Test
public void testDeployedPipeline() throws IOException, InterruptedException {
FlexTemplatesServiceClient flexTemplatesServiceClient =
FlexTemplatesServiceClient.create();
LaunchFlexTemplateRequest request =
LaunchFlexTemplateRequest.newBuilder()
.setProjectId(projectId)
.setLaunchParameter(
LaunchFlexTemplateParameter.newBuilder()
.setContainerSpecGcsPath(
"gs://cloud-bigtable-dataflow-templates/generate-workload.json")
.setJobName("generate-workload" + UUID.randomUUID().toString().substring(0, 20))
.putParameters("bigtableInstanceId", instanceId)
.putParameters("bigtableTableId", TABLE_ID)
.build())
.build();

LaunchFlexTemplateResponse response = flexTemplatesServiceClient.launchFlexTemplate(request);

String jobId = response.getJob().getId();
BigtableWorkloadOptions options = PipelineOptionsFactory.create()
.as(BigtableWorkloadOptions.class);
DataflowClient client = DataflowClient.create(options);

Thread.sleep(3 * 60 * 1000);
Job job = client.getJob(jobId);
assertThat(job.getCurrentState()).matches("JOB_STATE_RUNNING");

// Cancel job manually because test job never ends.
job.setRequestedState("JOB_STATE_CANCELLED");
client.updateJob(jobId, job);
}
}
6 changes: 6 additions & 0 deletions bigtable/cassandra-migration-codelab/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,11 @@
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.truth</groupId>
<artifactId>truth</artifactId>
<version>1.1.3</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

import static org.hamcrest.MatcherAssert.assertThat;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertNotNull;

import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
Expand All @@ -23,7 +23,6 @@
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.UUID;
import org.hamcrest.CoreMatchers;
import org.junit.Test;

public class CassandraMigrationCodelabTest {
Expand Down Expand Up @@ -66,7 +65,7 @@ public void testRunDoesNotFail() throws Exception {
cassandraMigrationCodelab.run();

String output = bout.toString();
assertThat(output, CoreMatchers.not(CoreMatchers.containsString("Error during")));
assertThat(output).doesNotContainMatch("Error during");

adminClient.deleteTable(TABLE_ID);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@

package com.example.bigtable;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -65,6 +64,6 @@ public void testConfigureConnectionPool() {
ConfigureConnectionPool.configureConnectionPool(projectId, instanceId);

String output = bout.toString();
assertThat(output, CoreMatchers.containsString("Connected with pool size of 10"));
assertThat(output).contains("Connected with pool size of 10");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package com.example.bigtable;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;

import com.google.cloud.bigtable.hbase.BigtableConfiguration;
import java.io.ByteArrayOutputStream;
Expand All @@ -30,7 +30,6 @@
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.hamcrest.CoreMatchers;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -93,30 +92,30 @@ public void test1_WriteSimple() {
WriteSimple.writeSimple(projectId, instanceId, TABLE_ID);

String output = bout.toString();
assertThat(output, CoreMatchers.containsString("Successfully wrote row"));
assertThat(output).contains("Successfully wrote row");
}

@Test
public void test2_WriteBatch() {
WriteBatch.writeBatch(projectId, instanceId, TABLE_ID);

String output = bout.toString();
assertThat(output, CoreMatchers.containsString("Successfully wrote 2 rows"));
assertThat(output).contains("Successfully wrote 2 rows");
}

@Test
public void test3_WriteConditionally() {
WriteConditionally.writeConditionally(projectId, instanceId, TABLE_ID);

String output = bout.toString();
assertThat(output, CoreMatchers.containsString("Successfully updated row's os_name"));
assertThat(output).contains("Successfully updated row's os_name");
}

@Test
public void test4_WriteIncrement() {
WriteIncrement.writeIncrement(projectId, instanceId, TABLE_ID);

String output = bout.toString();
assertThat(output, CoreMatchers.containsString("Successfully updated row"));
assertThat(output).contains("Successfully updated row");
}
}
13 changes: 5 additions & 8 deletions bigtable/memorystore/src/test/java/MemcachedTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
*/


import static com.google.common.truth.Truth.assertThat;
import static java.lang.Thread.sleep;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;

import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest;
Expand All @@ -26,8 +26,6 @@
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.UUID;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -130,17 +128,16 @@ public void testMemcached() throws InterruptedException {
Memcached.main(null);

String output = bout.toString();
assertThat(output, CoreMatchers.containsString("Value fetched from Bigtable: PQ2A.190405.003"));
assertThat(output).contains("Value fetched from Bigtable: PQ2A.190405.003");

// retry (due to occasional flakiness) if we didn't yet get the result in the cache
int retryCount = 0;
Matcher<String> foundInCache =
CoreMatchers.containsString("Value fetched from cache: PQ2A.190405.003");
while (retryCount < 5 && !foundInCache.matches(output)) {
String foundInCache = "Value fetched from cache: PQ2A.190405.003";
while (retryCount < 5 && !output.contains(foundInCache)) {
Memcached.main(null);
output = bout.toString();
retryCount++;
}
assertThat(output, foundInCache);
assertThat(output).contains(foundInCache);
}
}