Skip to content
Merged
Prev Previous commit
Next Next commit
fix java
Signed-off-by: Achal Shah <achals@gmail.com>
  • Loading branch information
achals committed Jul 19, 2022
commit 37e32e488c22ef52755dda1e6e40a23b210a31a2
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,84 @@
/** Feast Serving properties. */
public class ApplicationProperties {
private static final Logger log = org.slf4j.LoggerFactory.getLogger(ApplicationProperties.class);
private FeastProperties feast;
private GrpcServer grpc;
private RestServer rest;

public static class FeastProperties {
/* Feast Serving build version */
@NotBlank private String version = "unknown";
public FeastProperties getFeast() {
return feast;
}

public void setRegistry(String registry) {
this.registry = registry;
public void setFeast(FeastProperties feast) {
this.feast = feast;
}

public GrpcServer getGrpc() {
return grpc;
}

public void setGrpc(GrpcServer grpc) {
this.grpc = grpc;
}

public RestServer getRest() {
return rest;
}

public void setRest(RestServer rest) {
this.rest = rest;
}

/**
* Validates all FeastProperties. This method runs after properties have been initialized and
* individually and conditionally validates each class.
*/
@PostConstruct
public void validate() {
ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
Validator validator = factory.getValidator();

// Validate root fields in FeastProperties
Set<ConstraintViolation<ApplicationProperties>> violations = validator.validate(this);
if (!violations.isEmpty()) {
throw new ConstraintViolationException(violations);
}
}

public enum StoreType {
REDIS,
REDIS_CLUSTER;
}

public static class FeastProperties {
/* Feast Serving build version */
@NotBlank private String version = "unknown";
@NotBlank private String registry;
@NotBlank private String project;
private int registryRefreshInterval;
private int entityKeySerializationVersion;
/** Name of the active store configuration (only one store can be active at a time). */
@NotBlank private String activeStore;
/**
* Collection of store configurations. The active store is selected by the "activeStore" field.
*/
@JsonMerge(OptBoolean.FALSE)
private List<Store> stores = new ArrayList<>();
/* Metric tracing properties. */
private TracingProperties tracing;
/* Feast Audit Logging properties */
@NotNull private LoggingProperties logging;
private String gcpProject;
private String awsRegion;
private String transformationServiceEndpoint;

public String getRegistry() {
return registry;
}

@NotBlank private String project;
public void setRegistry(String registry) {
this.registry = registry;
}

public String getProject() {
return project;
Expand All @@ -63,8 +125,6 @@ public void setProject(final String project) {
this.project = project;
}

private int registryRefreshInterval;

public int getRegistryRefreshInterval() {
return registryRefreshInterval;
}
Expand All @@ -73,6 +133,14 @@ public void setRegistryRefreshInterval(int registryRefreshInterval) {
this.registryRefreshInterval = registryRefreshInterval;
}

public int getEntityKeySerializationVersion() {
return entityKeySerializationVersion;
}

public void setEntityKeySerializationVersion(int entityKeySerializationVersion) {
this.entityKeySerializationVersion = entityKeySerializationVersion;
}

/**
* Finds and returns the active store
*
Expand All @@ -92,25 +160,6 @@ public void setActiveStore(String activeStore) {
this.activeStore = activeStore;
}

/** Name of the active store configuration (only one store can be active at a time). */
@NotBlank private String activeStore;

/**
* Collection of store configurations. The active store is selected by the "activeStore" field.
*/
@JsonMerge(OptBoolean.FALSE)
private List<Store> stores = new ArrayList<>();

/* Metric tracing properties. */
private TracingProperties tracing;

/* Feast Audit Logging properties */
@NotNull private LoggingProperties logging;

public void setStores(List<Store> stores) {
this.stores = stores;
}

/**
* Gets Serving store configuration as a list of {@link Store}.
*
Expand All @@ -120,6 +169,10 @@ public List<Store> getStores() {
return stores;
}

public void setStores(List<Store> stores) {
this.stores = stores;
}

/**
* Gets Feast Serving build version.
*
Expand All @@ -129,10 +182,6 @@ public String getVersion() {
return version;
}

public void setTracing(TracingProperties tracing) {
this.tracing = tracing;
}

/**
* Gets tracing properties
*
Expand All @@ -142,6 +191,10 @@ public TracingProperties getTracing() {
return tracing;
}

public void setTracing(TracingProperties tracing) {
this.tracing = tracing;
}

/**
* Gets logging properties
*
Expand All @@ -151,8 +204,6 @@ public LoggingProperties getLogging() {
return logging;
}

private String gcpProject;

public String getGcpProject() {
return gcpProject;
}
Expand All @@ -161,17 +212,13 @@ public void setGcpProject(String gcpProject) {
this.gcpProject = gcpProject;
}

public void setAwsRegion(String awsRegion) {
this.awsRegion = awsRegion;
}

private String awsRegion;

public String getAwsRegion() {
return awsRegion;
}

private String transformationServiceEndpoint;
public void setAwsRegion(String awsRegion) {
this.awsRegion = awsRegion;
}

public String getTransformationServiceEndpoint() {
return transformationServiceEndpoint;
Expand All @@ -182,16 +229,6 @@ public void setTransformationServiceEndpoint(String transformationServiceEndpoin
}
}

private FeastProperties feast;

public void setFeast(FeastProperties feast) {
this.feast = feast;
}

public FeastProperties getFeast() {
return feast;
}

/** Store configuration class for database that this Feast Serving uses. */
public static class Store {

Expand Down Expand Up @@ -327,30 +364,6 @@ public void setServer(Server server) {
}
}

private GrpcServer grpc;
private RestServer rest;

public GrpcServer getGrpc() {
return grpc;
}

public void setGrpc(GrpcServer grpc) {
this.grpc = grpc;
}

public RestServer getRest() {
return rest;
}

public void setRest(RestServer rest) {
this.rest = rest;
}

public enum StoreType {
REDIS,
REDIS_CLUSTER;
}

/** Trace metric collection properties */
public static class TracingProperties {

Expand Down Expand Up @@ -417,20 +430,4 @@ public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}
}

/**
* Validates all FeastProperties. This method runs after properties have been initialized and
* individually and conditionally validates each class.
*/
@PostConstruct
public void validate() {
ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
Validator validator = factory.getValidator();

// Validate root fields in FeastProperties
Set<ConstraintViolation<ApplicationProperties>> violations = validator.validate(this);
if (!violations.isEmpty()) {
throw new ConstraintViolationException(violations);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public ServingServiceV2 registryBasedServingServiceV2(
new OnlineRetriever(
applicationProperties.getFeast().getProject(),
redisClusterClient,
new EntityKeySerializerV2());
new EntityKeySerializerV2(
applicationProperties.getFeast().getEntityKeySerializationVersion()));
break;
case REDIS:
RedisClientAdapter redisClient = RedisClient.create(store.getRedisConfig());
Expand All @@ -57,7 +58,8 @@ public ServingServiceV2 registryBasedServingServiceV2(
new OnlineRetriever(
applicationProperties.getFeast().getProject(),
redisClient,
new EntityKeySerializerV2());
new EntityKeySerializerV2(
applicationProperties.getFeast().getEntityKeySerializationVersion()));
break;
default:
throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@
// https://github.com/feast-dev/feast/blob/b1ccf8dd1535f721aee8bea937ee38feff80bec5/sdk/python/feast/infra/key_encoding_utils.py#L22
// and must be kept up to date with any changes in that logic.
public class EntityKeySerializerV2 implements EntityKeySerializer {
private final int entityKeySerializationVersion;

public EntityKeySerializerV2() {
this(1);
}

public EntityKeySerializerV2(int entityKeySerializationVersion) {
this.entityKeySerializationVersion = entityKeySerializationVersion;
}

@Override
public byte[] serialize(RedisProto.RedisKeyV2 entityKey) {
Expand Down Expand Up @@ -83,7 +92,11 @@ public byte[] serialize(RedisProto.RedisKeyV2 entityKey) {
we use `struct.pack("<l", v.int64_val)` to get the bytes of an int64 val. This actually extracts only 4 bytes,
instead of 8 bytes as you'd expect from to serialize an int64 value.
*/
buffer.addAll(encodeInteger(((Long) val.getInt64Val()).intValue()));
if (this.entityKeySerializationVersion <= 1) {
buffer.addAll(encodeInteger(((Long) val.getInt64Val()).intValue()));
} else {
buffer.addAll(encodeLong(((Long) val.getInt64Val())));
}

break;
default:
Expand Down Expand Up @@ -113,6 +126,14 @@ private List<Byte> encodeInteger(Integer value) {
return Arrays.asList(ArrayUtils.toObject(buffer.array()));
}

private List<Byte> encodeLong(Long value) {
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
buffer.order(ByteOrder.LITTLE_ENDIAN);
buffer.putInt(value);

return Arrays.asList(ArrayUtils.toObject(buffer.array()));
}

private List<Byte> encodeString(String value) {
byte[] stringBytes = value.getBytes(StandardCharsets.UTF_8);
return encodeBytes(stringBytes);
Expand Down