2222import com .google .cloud .bigquery .storage .v1beta1 .AvroProto .AvroRows ;
2323import com .google .cloud .bigquery .storage .v1beta1 .BigQueryStorageClient ;
2424import com .google .cloud .bigquery .storage .v1beta1 .ReadOptions .TableReadOptions ;
25+ import com .google .cloud .bigquery .storage .v1beta1 .Storage ;
2526import com .google .cloud .bigquery .storage .v1beta1 .Storage .CreateReadSessionRequest ;
2627import com .google .cloud .bigquery .storage .v1beta1 .Storage .DataFormat ;
2728import com .google .cloud .bigquery .storage .v1beta1 .Storage .ReadRowsRequest ;
4041import org .apache .avro .io .DatumReader ;
4142import org .apache .avro .io .DecoderFactory ;
4243
43-
4444public class StorageSample {
4545
4646 /*
@@ -68,8 +68,9 @@ public SimpleRowReader(Schema schema) {
6868 * @param avroRows object returned from the ReadRowsResponse.
6969 */
7070 public void processRows (AvroRows avroRows ) throws IOException {
71- decoder = DecoderFactory .get ()
72- .binaryDecoder (avroRows .getSerializedBinaryRows ().toByteArray (), decoder );
71+ decoder =
72+ DecoderFactory .get ()
73+ .binaryDecoder (avroRows .getSerializedBinaryRows ().toByteArray (), decoder );
7374
7475 while (!decoder .isEnd ()) {
7576 // Reusing object row
@@ -92,60 +93,67 @@ public static void main(String... args) throws Exception {
9293 String parent = String .format ("projects/%s" , projectId );
9394
9495 // This example uses baby name data from the public datasets.
95- TableReference tableReference = TableReference .newBuilder ()
96- .setProjectId ("bigquery-public-data" )
97- .setDatasetId ("usa_names" )
98- .setTableId ("usa_1910_current" )
99- .build ();
96+ TableReference tableReference =
97+ TableReference .newBuilder ()
98+ .setProjectId ("bigquery-public-data" )
99+ .setDatasetId ("usa_names" )
100+ .setTableId ("usa_1910_current" )
101+ .build ();
100102
101103 // We specify the columns to be projected by adding them to the selected fields,
102104 // and set a simple filter to restrict which rows are transmitted.
103- TableReadOptions options = TableReadOptions .newBuilder ()
104- .addSelectedFields ("name" )
105- .addSelectedFields ("number" )
106- .addSelectedFields ("state" )
107- .setRowRestriction ("state = \" WA\" " )
108- .build ();
105+ TableReadOptions options =
106+ TableReadOptions .newBuilder ()
107+ .addSelectedFields ("name" )
108+ .addSelectedFields ("number" )
109+ .addSelectedFields ("state" )
110+ .setRowRestriction ("state = \" WA\" " )
111+ .build ();
109112
110113 // Begin building the session request.
111- CreateReadSessionRequest .Builder builder = CreateReadSessionRequest .newBuilder ()
112- .setParent (parent )
113- .setTableReference (tableReference )
114- .setReadOptions (options )
115- .setRequestedStreams (1 )
116- .setFormat (DataFormat .AVRO );
114+ CreateReadSessionRequest .Builder builder =
115+ CreateReadSessionRequest .newBuilder ()
116+ .setParent (parent )
117+ .setTableReference (tableReference )
118+ .setReadOptions (options )
119+ // This API can also deliver data serialized in Apache Arrow format.
120+ // This example leverages Apache Avro.
121+ .setFormat (DataFormat .AVRO )
122+ // We use a LIQUID strategy in this example because we only
123+ // read from a single stream. Consider BALANCED if you're consuming
124+ // multiple streams concurrently and want more consistent stream sizes.
125+ .setShardingStrategy (Storage .ShardingStrategy .LIQUID )
126+ .setRequestedStreams (1 )
127+ .setFormat (DataFormat .AVRO );
117128
118129 // Optionally specify the snapshot time. When unspecified, snapshot time is "now".
119130 if (snapshotMillis != null ) {
120- Timestamp t = Timestamp .newBuilder ()
121- .setSeconds (snapshotMillis / 1000 )
122- .setNanos ((int ) ((snapshotMillis % 1000 ) * 1000000 ))
123- .build ();
124- TableModifiers modifiers = TableModifiers .newBuilder ()
125- .setSnapshotTime (t )
126- .build ();
131+ Timestamp t =
132+ Timestamp .newBuilder ()
133+ .setSeconds (snapshotMillis / 1000 )
134+ .setNanos ((int ) ((snapshotMillis % 1000 ) * 1000000 ))
135+ .build ();
136+ TableModifiers modifiers = TableModifiers .newBuilder ().setSnapshotTime (t ).build ();
127137 builder .setTableModifiers (modifiers );
128138 }
129139
130140 // Request the session creation.
131141 ReadSession session = client .createReadSession (builder .build ());
132142
133- SimpleRowReader reader = new SimpleRowReader (
134- new Schema .Parser ().parse (session .getAvroSchema ().getSchema ()));
143+ SimpleRowReader reader =
144+ new SimpleRowReader ( new Schema .Parser ().parse (session .getAvroSchema ().getSchema ()));
135145
136146 // Assert that there are streams available in the session. An empty table may not have
137147 // data available. If no sessions are available for an anonymous (cached) table, consider
138148 // writing results of a query to a named table rather than consuming cached results directly.
139149 Preconditions .checkState (session .getStreamsCount () > 0 );
140150
141151 // Use the first stream to perform reading.
142- StreamPosition readPosition = StreamPosition .newBuilder ()
143- .setStream (session .getStreams (0 ))
144- .build ();
152+ StreamPosition readPosition =
153+ StreamPosition .newBuilder ().setStream (session .getStreams (0 )).build ();
145154
146- ReadRowsRequest readRowsRequest = ReadRowsRequest .newBuilder ()
147- .setReadPosition (readPosition )
148- .build ();
155+ ReadRowsRequest readRowsRequest =
156+ ReadRowsRequest .newBuilder ().setReadPosition (readPosition ).build ();
149157
150158 // Process each block of rows as they arrive and decode using our simple row reader.
151159 ServerStream <ReadRowsResponse > stream = client .readRowsCallable ().call (readRowsRequest );
0 commit comments