Skip to content
This repository was archived by the owner on Mar 12, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.grpc.stub.ClientCalls;
import io.grpc.stub.StreamObserver;

import org.apache.horaedb.rpc.interceptors.AuthenticationInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -394,6 +395,8 @@ public void addInterceptor(final ClientInterceptor interceptor) {
// Interceptors run in the reverse order in which they are added
private void initInterceptors() {
// the last one
addInterceptor(new AuthenticationInterceptor(opts.getUser(), opts.getPassword()));

addInterceptor(new MetricInterceptor());

// the second
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.
*/
package org.apache.horaedb.rpc.interceptors;

import io.grpc.*;

import java.util.Base64;

public class AuthenticationInterceptor implements ClientInterceptor {
private final String token;

public AuthenticationInterceptor(String user, String password) {
// Build token
this.token = Base64.getEncoder().encodeToString((user + ":" + password).getBytes());
}

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(final MethodDescriptor<ReqT, RespT> method,
final CallOptions callOpts, Channel next) {

return new AuthenticationAttachingClientCall<>(next.newCall(method, callOpts), token);
}

private static final class AuthenticationAttachingClientCall<ReqT, RespT>
extends ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT> {

private final String token;
private static final String AUTHORIZATION_HEADER = "authorization";
private static final String BASIC_PREFIX = "Basic ";

private AuthenticationAttachingClientCall(ClientCall<ReqT, RespT> delegate, String token) {
super(delegate);
this.token = token;
}

@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
headers.put(Metadata.Key.of(AUTHORIZATION_HEADER, Metadata.ASCII_STRING_MARSHALLER), BASIC_PREFIX + token);
super.start(responseListener, headers);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ public static final class Builder {
// The routeMode for sdk, only Proxy and Direct support now.
private RouteMode routeMode;
private String database;
private String user;
private String password;

// Asynchronous thread pool, which is used to handle various asynchronous tasks in the SDK.
private Executor asyncWritePool;
private Executor asyncReadPool;
Expand Down Expand Up @@ -201,6 +204,13 @@ public Builder(Endpoint clusterAddress, RouteMode routeMode) {
this.routeMode = routeMode;
}

@SuppressWarnings("PMD")
public Builder authentication(final String user, final String password) {
this.user = user;
this.password = password;
return this;
}

/**
*
* @param database the database name
Expand Down Expand Up @@ -366,6 +376,10 @@ public HoraeDBOptions build() {
opts.asyncWritePool = asyncWritePool;
opts.asyncReadPool = asyncReadPool;
opts.rpcOptions = this.rpcOptions;

opts.rpcOptions.setUser(this.user);
opts.rpcOptions.setPassword(this.password);

opts.routerOptions = new RouterOptions();
opts.routerOptions.setClusterAddress(this.clusterAddress);
opts.routerOptions.setMaxCachedSize(this.routeTableMaxCachedSize);
Expand Down
28 changes: 28 additions & 0 deletions horaedb-rpc/src/main/java/org/apache/horaedb/rpc/RpcOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@
*/
public class RpcOptions implements Copiable<RpcOptions> {

/**
* Username provided for authentication
*/
private String user;

/**
* Password provided for authentication
*/
private String password;

/**
* RPC request default timeout in milliseconds
* Default: 10000(10s)
Expand Down Expand Up @@ -96,6 +106,14 @@ public class RpcOptions implements Copiable<RpcOptions> {
*/
private long connectionMaxAgeMs = 0;

public String getUser() {
return user;
}

public String getPassword() {
return password;
}

public int getDefaultRpcTimeout() {
return defaultRpcTimeout;
}
Expand Down Expand Up @@ -180,6 +198,14 @@ public LimitKind getLimitKind() {
return limitKind;
}

public void setUser(String user) {
this.user = user;
}

public void setPassword(String password) {
this.password = password;
}

public void setLimitKind(LimitKind limitKind) {
this.limitKind = limitKind;
}
Expand Down Expand Up @@ -235,6 +261,8 @@ public void setLogOnLimitChange(boolean logOnLimitChange) {
@Override
public RpcOptions copy() {
final RpcOptions opts = new RpcOptions();
opts.user = this.user;
opts.password = this.password;
opts.defaultRpcTimeout = this.defaultRpcTimeout;
opts.rpcThreadPoolSize = this.rpcThreadPoolSize;
opts.rpcThreadPoolQueueSize = this.rpcThreadPoolQueueSize;
Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,13 @@
<exclude>**/sql/TokenMgrException.java</exclude>
</excludes>
</configuration>
<dependencies>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
<version>9.2</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>com.mycila</groupId>
Expand Down