Skip to content

Commit 74d0201

Browse files
adutraolim7t
authored andcommitted
JAVA-1126: JAVA-1126: Handle schema changes in Mapper (apache#678)
The mapping manager subscribes to schema change events and reacts to table and UDT removed / modified notifications: a warning is logged and, for modifications, the manager tries to replace the mapper or codec with a new one (which may or may not work depending on the nature of the change and whether client code kept a reference to the old mapper instance). In addition, table/UDT names and column/field names are validated earlier, in a fail-fast attempt to detect wrong configurations.
1 parent 1c3ad00 commit 74d0201

11 files changed

Lines changed: 679 additions & 68 deletions

File tree

changelog/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
- [bug] JAVA-1232: Fix NPE in IdempotenceAwareRetryPolicy.isIdempotent.
1818
- [improvement] JAVA-1227: Document "SELECT *" issue with prepared statement.
1919
- [bug] JAVA-1160: Fix NPE in VersionNumber.getPreReleaseLabels().
20+
- [improvement] JAVA-1126: Handle schema changes in Mapper.
2021

2122

2223
### 3.0.2
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Copyright (C) 2012-2015 DataStax Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.driver.core;
17+
18+
/**
19+
* Base implementation for {@link SchemaChangeListener}.
20+
*/
21+
public abstract class SchemaChangeListenerBase implements SchemaChangeListener {
22+
23+
@Override
24+
public void onKeyspaceAdded(KeyspaceMetadata keyspace) {
25+
26+
}
27+
28+
@Override
29+
public void onKeyspaceRemoved(KeyspaceMetadata keyspace) {
30+
31+
}
32+
33+
@Override
34+
public void onKeyspaceChanged(KeyspaceMetadata current, KeyspaceMetadata previous) {
35+
36+
}
37+
38+
@Override
39+
public void onTableAdded(TableMetadata table) {
40+
41+
}
42+
43+
@Override
44+
public void onTableRemoved(TableMetadata table) {
45+
46+
}
47+
48+
@Override
49+
public void onTableChanged(TableMetadata current, TableMetadata previous) {
50+
51+
}
52+
53+
@Override
54+
public void onUserTypeAdded(UserType type) {
55+
56+
}
57+
58+
@Override
59+
public void onUserTypeRemoved(UserType type) {
60+
61+
}
62+
63+
@Override
64+
public void onUserTypeChanged(UserType current, UserType previous) {
65+
66+
}
67+
68+
@Override
69+
public void onFunctionAdded(FunctionMetadata function) {
70+
71+
}
72+
73+
@Override
74+
public void onFunctionRemoved(FunctionMetadata function) {
75+
76+
}
77+
78+
@Override
79+
public void onFunctionChanged(FunctionMetadata current, FunctionMetadata previous) {
80+
81+
}
82+
83+
@Override
84+
public void onAggregateAdded(AggregateMetadata aggregate) {
85+
86+
}
87+
88+
@Override
89+
public void onAggregateRemoved(AggregateMetadata aggregate) {
90+
91+
}
92+
93+
@Override
94+
public void onAggregateChanged(AggregateMetadata current, AggregateMetadata previous) {
95+
96+
}
97+
98+
@Override
99+
public void onMaterializedViewAdded(MaterializedViewMetadata view) {
100+
101+
}
102+
103+
@Override
104+
public void onMaterializedViewRemoved(MaterializedViewMetadata view) {
105+
106+
}
107+
108+
@Override
109+
public void onMaterializedViewChanged(MaterializedViewMetadata current, MaterializedViewMetadata previous) {
110+
111+
}
112+
113+
@Override
114+
public void onRegister(Cluster cluster) {
115+
116+
}
117+
118+
@Override
119+
public void onUnregister(Cluster cluster) {
120+
121+
}
122+
}

driver-mapping/src/main/java/com/datastax/driver/mapping/AnnotationParser.java

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
package com.datastax.driver.mapping;
1717

1818
import com.datastax.driver.core.ConsistencyLevel;
19-
import com.datastax.driver.core.Metadata;
19+
import com.datastax.driver.core.TableMetadata;
2020
import com.datastax.driver.core.TypeCodec;
2121
import com.datastax.driver.core.UserType;
2222
import com.datastax.driver.mapping.MethodMapper.ParamMapper;
@@ -31,6 +31,8 @@
3131
import java.util.*;
3232
import java.util.concurrent.atomic.AtomicInteger;
3333

34+
import static com.datastax.driver.core.Metadata.quote;
35+
3436
/**
3537
* Static metods that facilitates parsing class annotations into the corresponding {@link EntityMapper}.
3638
*/
@@ -66,6 +68,10 @@ public static <T> EntityMapper<T> parseEntity(Class<T> entityClass, EntityMapper
6668
}
6769

6870
EntityMapper<T> mapper = factory.create(entityClass, ksName, tableName, writeConsistency, readConsistency);
71+
TableMetadata tableMetadata = mappingManager.getSession().getCluster().getMetadata().getKeyspace(ksName).getTable(tableName);
72+
73+
if (tableMetadata == null)
74+
throw new IllegalArgumentException(String.format("Table %s does not exist in keyspace %s", tableName, ksName));
6975

7076
List<Field> pks = new ArrayList<Field>();
7177
List<Field> ccs = new ArrayList<Field>();
@@ -106,18 +112,24 @@ public static <T> EntityMapper<T> parseEntity(Class<T> entityClass, EntityMapper
106112
validateOrder(pks, "@PartitionKey");
107113
validateOrder(ccs, "@ClusteringColumn");
108114

109-
mapper.addColumns(createColumnMappers(pks, factory, mapper.entityClass, mappingManager, columnCounter),
110-
createColumnMappers(ccs, factory, mapper.entityClass, mappingManager, columnCounter),
111-
createColumnMappers(rgs, factory, mapper.entityClass, mappingManager, columnCounter));
115+
mapper.addColumns(
116+
createColumnMappers(pks, factory, mapper.entityClass, mappingManager, columnCounter, tableMetadata, ksName, tableName),
117+
createColumnMappers(ccs, factory, mapper.entityClass, mappingManager, columnCounter, tableMetadata, ksName, tableName),
118+
createColumnMappers(rgs, factory, mapper.entityClass, mappingManager, columnCounter, tableMetadata, ksName, tableName));
112119
return mapper;
113120
}
114121

115-
private static <T> List<ColumnMapper<T>> createColumnMappers(List<Field> fields, EntityMapper.Factory factory, Class<T> klass, MappingManager mappingManager, AtomicInteger columnCounter) {
122+
private static <T> List<ColumnMapper<T>> createColumnMappers(List<Field> fields, EntityMapper.Factory factory, Class<T> klass, MappingManager mappingManager, AtomicInteger columnCounter, TableMetadata tableMetadata, String ksName, String tableName) {
116123
List<ColumnMapper<T>> mappers = new ArrayList<ColumnMapper<T>>(fields.size());
117124
for (int i = 0; i < fields.size(); i++) {
118125
Field field = fields.get(i);
119126
int pos = position(field);
120-
mappers.add(factory.createColumnMapper(klass, field, pos < 0 ? i : pos, mappingManager, columnCounter));
127+
ColumnMapper<T> columnMapper = factory.createColumnMapper(klass, field, pos < 0 ? i : pos, mappingManager, columnCounter);
128+
if (columnMapper.kind == ColumnMapper.Kind.COMPUTED || tableMetadata.getColumn(columnMapper.getColumnName()) != null)
129+
mappers.add(columnMapper);
130+
else
131+
throw new IllegalArgumentException(String.format("Column %s does not exist in table %s.%s",
132+
columnMapper.getColumnName(), ksName, tableName));
121133
}
122134
return mappers;
123135
}
@@ -126,7 +138,7 @@ public static <T> MappedUDTCodec<T> parseUDT(Class<T> udtClass, EntityMapper.Fac
126138
UDT udt = AnnotationChecks.getTypeAnnotation(UDT.class, udtClass);
127139

128140
String ksName = udt.caseSensitiveKeyspace() ? udt.keyspace() : udt.keyspace().toLowerCase();
129-
String udtName = udt.caseSensitiveType() ? Metadata.quote(udt.name()) : udt.name().toLowerCase();
141+
String udtName = udt.caseSensitiveType() ? quote(udt.name()) : udt.name().toLowerCase();
130142

131143
if (Strings.isNullOrEmpty(udt.keyspace())) {
132144
ksName = mappingManager.getSession().getLoggedKeyspace();
@@ -139,6 +151,8 @@ public static <T> MappedUDTCodec<T> parseUDT(Class<T> udtClass, EntityMapper.Fac
139151
}
140152

141153
UserType userType = mappingManager.getSession().getCluster().getMetadata().getKeyspace(ksName).getUserType(udtName);
154+
if (userType == null)
155+
throw new IllegalArgumentException(String.format("User type %s does not exist in keyspace %s", udtName, ksName));
142156

143157
List<Field> columns = new ArrayList<Field>();
144158

@@ -163,17 +177,21 @@ public static <T> MappedUDTCodec<T> parseUDT(Class<T> udtClass, EntityMapper.Fac
163177
break;
164178
}
165179
}
166-
Map<String, ColumnMapper<T>> columnMappers = createFieldMappers(columns, factory, udtClass, mappingManager, null);
180+
Map<String, ColumnMapper<T>> columnMappers = createFieldMappers(columns, factory, udtClass, mappingManager, userType, ksName);
167181
return new MappedUDTCodec<T>(userType, udtClass, columnMappers, mappingManager);
168182
}
169183

170-
private static <T> Map<String, ColumnMapper<T>> createFieldMappers(List<Field> fields, EntityMapper.Factory factory, Class<T> klass, MappingManager mappingManager, AtomicInteger columnCounter) {
184+
private static <T> Map<String, ColumnMapper<T>> createFieldMappers(List<Field> fields, EntityMapper.Factory factory, Class<T> klass, MappingManager mappingManager, UserType userType, String ksName) {
171185
Map<String, ColumnMapper<T>> mappers = Maps.newHashMapWithExpectedSize(fields.size());
172186
for (int i = 0; i < fields.size(); i++) {
173187
Field field = fields.get(i);
174188
int pos = position(field);
175-
ColumnMapper<T> mapper = factory.createColumnMapper(klass, field, pos < 0 ? i : pos, mappingManager, columnCounter);
176-
mappers.put(mapper.getColumnName(), mapper);
189+
ColumnMapper<T> mapper = factory.createColumnMapper(klass, field, pos < 0 ? i : pos, mappingManager, null);
190+
if (userType.contains(mapper.getColumnName()))
191+
mappers.put(mapper.getColumnName(), mapper);
192+
else
193+
throw new IllegalArgumentException(String.format("Field %s does not exist in type %s.%s",
194+
mapper.getColumnName(), ksName, userType.getTypeName()));
177195
}
178196
return mappers;
179197
}

driver-mapping/src/main/java/com/datastax/driver/mapping/MappedUDTCodec.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ protected T newInstance() {
4646
}
4747
}
4848

49+
Class<T> getUdtClass() {
50+
return udtClass;
51+
}
52+
4953
@Override
5054
protected ByteBuffer serializeField(T source, String fieldName, ProtocolVersion protocolVersion) {
5155
// The parent class passes lowercase names unquoted, but in our internal map of mappers they are always quoted

driver-mapping/src/main/java/com/datastax/driver/mapping/Mapper.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,10 @@ ListenableFuture<PreparedStatement> getPreparedQueryAsync(QueryType type, EnumMa
141141
return getPreparedQueryAsync(type, Collections.<ColumnMapper<?>>emptySet(), options);
142142
}
143143

144+
Class<T> getMappedClass() {
145+
return klass;
146+
}
147+
144148
/**
145149
* The {@code TableMetadata} for this mapper.
146150
*
@@ -921,7 +925,7 @@ public Type getType() {
921925

922926
static class Ttl extends Option {
923927

924-
private int ttlValue;
928+
private final int ttlValue;
925929

926930
Ttl(int value) {
927931
super(Type.TTL);
@@ -957,7 +961,7 @@ boolean isIncludedInQuery() {
957961

958962
static class Timestamp extends Option {
959963

960-
private long tsValue;
964+
private final long tsValue;
961965

962966
Timestamp(long value) {
963967
super(Type.TIMESTAMP);
@@ -993,7 +997,7 @@ boolean isIncludedInQuery() {
993997

994998
static class ConsistencyLevelOption extends Option {
995999

996-
private ConsistencyLevel cl;
1000+
private final ConsistencyLevel cl;
9971001

9981002
ConsistencyLevelOption(ConsistencyLevel cl) {
9991003
super(Type.CL);
@@ -1029,7 +1033,7 @@ boolean isIncludedInQuery() {
10291033

10301034
static class Tracing extends Option {
10311035

1032-
private boolean tracing;
1036+
private final boolean tracing;
10331037

10341038
Tracing(boolean tracing) {
10351039
super(Type.TRACING);
@@ -1066,7 +1070,7 @@ boolean isIncludedInQuery() {
10661070

10671071
static class SaveNullFields extends Option {
10681072

1069-
private boolean saveNullFields;
1073+
private final boolean saveNullFields;
10701074

10711075
SaveNullFields(boolean saveNullFields) {
10721076
super(SAVE_NULL_FIELDS);

0 commit comments

Comments
 (0)