Skip to content

Commit

Permalink
Merge pull request #252 from ZhiHanZ/client-lb
Browse files Browse the repository at this point in the history
feat: support client side load balancing
  • Loading branch information
ZhiHanZ authored Aug 19, 2024
2 parents 01f8527 + da0b3bb commit d63eebb
Show file tree
Hide file tree
Showing 23 changed files with 1,091 additions and 80 deletions.
48 changes: 48 additions & 0 deletions .github/actions/setup_databend_cluster/action.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
name: "Setup Stateful Cluster Linux"
description: "Running stateful tests in cluster mode"
inputs:
version:
description: "query and meta service version"
required: true
default: "1.2.616-nightly"
target:
description: ""
required: true
default: "x86_64-unknown-linux-gnu"
runs:
using: "composite"
steps:

- name: Minio Setup for (ubuntu-latest only)
shell: bash
run: |
docker run -d --network host --name minio \
-e "MINIO_ACCESS_KEY=minioadmin" \
-e "MINIO_SECRET_KEY=minioadmin" \
-e "MINIO_ADDRESS=:9900" \
-v /tmp/data:/data \
-v /tmp/config:/root/.minio \
minio/minio server /data
export AWS_ACCESS_KEY_ID=minioadmin
export AWS_SECRET_ACCESS_KEY=minioadmin
export AWS_EC2_METADATA_DISABLED=true
aws --endpoint-url http://127.0.0.1:9900/ s3 mb s3://testbucket
- name: Download binary and extract into target directory
shell: bash
run: |
wget --progress=bar:force:noscroll https://github.com/datafuselabs/databend/releases/download/v${{ inputs.version }}/databend-v${{ inputs.version }}-${{ inputs.target }}.tar.gz
mkdir -p ./databend
tar -xzvf databend-v${{ inputs.version }}-${{ inputs.target }}.tar.gz -C ./databend
rm databend-v${{ inputs.version }}-${{ inputs.target }}.tar.gz
- name: Start Databend Cluster
shell: bash
run: |
chmod +x ./databend/bin/databend-meta
chmod +x ./databend/bin/databend-query
chmod +x ./scripts/wait_tcp.py
chmod +x ./scripts/deploy/deploy_cluster.sh
./scripts/deploy/deploy_cluster.sh
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,6 @@ jobs:
curl -u databend:databend --request POST localhost:8000/v1/query --header 'Content-Type:application/json' --data-raw '{"sql":"select 1"}'
- name: Run Maven clean deploy with release profile
run: mvn clean test
run: mvn test -DexcludedGroups=cluster
env:
MAVEN_GPG_PASSPHRASE: ${{ secrets.GPG_PASSPHRASE }}
39 changes: 39 additions & 0 deletions .github/workflows/test_cluster.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
name: Databend Cluster Tests

on:
push:
branches:
- main
- master
pull_request:
branches:
- main
- master

jobs:
test:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
ref: ${{ github.ref }}

- name: Set up JDK 17
uses: actions/setup-java@v4
with:
distribution: 'temurin'
java-version: '17'
cache: 'maven'
gpg-private-key: ${{ secrets.GPG_PRIVATE_KEY }} # Value of the GPG private key to import
gpg-passphrase: MAVEN_GPG_PASSPHRASE # env variable for GPG private key passphrase

- uses: ./.github/actions/setup_databend_cluster
timeout-minutes: 15
with:
version: '1.2.616-nightly'
target: 'x86_64-unknown-linux-gnu'
- name: Run Maven clean deploy with release profile
run: mvn clean test
env:
MAVEN_GPG_PASSPHRASE: ${{ secrets.GPG_PASSPHRASE }}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public interface DatabendClient extends Closeable {
void close();

DatabendSession getSession();
String getHost();
Map<String, String> getAdditionalHeaders();
QueryResults getResults();
// execute Restful query request for the first time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import javax.annotation.concurrent.ThreadSafe;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.ConnectException;
import java.net.URI;
import java.time.Duration;
import java.util.Map;
Expand Down Expand Up @@ -156,7 +158,11 @@ private boolean executeInternal(Request request, OptionalLong materializedJsonSi
try {
response = JsonResponse.execute(QUERY_RESULTS_CODEC, httpClient, request, materializedJsonSizeLimit);
} catch (RuntimeException e) {
throw new RuntimeException("Query failed: " + e.getMessage());
if (e.getCause() instanceof ConnectException) {
// specially handle connection refused exception for retry purpose.
throw e;
}
throw new RuntimeException("Query failed: " + e.getMessage(), e);
}

if ((response.getStatusCode() == HTTP_OK) && response.hasValue() && (response.getValue().getError() == null)) {
Expand Down Expand Up @@ -260,6 +266,11 @@ public DatabendSession getSession() {
return databendSession.get();
}

@Override
public String getHost() {
return this.host;
}

@Override
public void close() {
closeQuery();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,13 @@ public void setAutoCommit(boolean autoCommit) {
}

public static final class Builder {
private URI host;
private String database;
private final AtomicBoolean autoCommit = new AtomicBoolean(false);
private Map<String, String> settings;

// txn
private String txnState;

public Builder setHost(URI host) {
this.host = host;
return this;
}

public Builder setDatabase(String database) {
this.database = database;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.databend.client;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.util.concurrent.UncheckedExecutionException;
import okhttp3.Headers;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
Expand All @@ -26,6 +27,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.ConnectException;
import java.util.Optional;
import java.util.OptionalLong;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import org.testng.Assert;
import org.testng.annotations.Test;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.ConnectException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -49,6 +52,23 @@ public void testBasicQueryPagination() {
cli.close();
}

@Test(groups = {"it"})
public void testConnectionRefused() {
OkHttpClient client = new OkHttpClient.Builder().addInterceptor(OkHttpUtils.basicAuthInterceptor("databend", "databend")).build();
ClientSettings settings = new ClientSettings("http://localhost:13191");

try {
DatabendClient cli = new DatabendClientV1(client, "select 1", settings, null);
cli.getResults(); // This should trigger the connection attempt
Assert.fail("Expected exception was not thrown");
} catch (Exception e) {
System.out.println(e.getMessage());
Assert.assertTrue(
e instanceof ConnectException || e.getCause() instanceof ConnectException, "Exception should be IOException or contain IOException as cause");

}
}

@Test(groups = {"it"})
public void testBasicQueryIDHeader() {
OkHttpClient client = new OkHttpClient.Builder().addInterceptor(OkHttpUtils.basicAuthInterceptor("databend", "databend")).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public final class ConnectionProperties {
public static final ConnectionProperty<String> WAREHOUSE = new Warehouse();
public static final ConnectionProperty<String> SSL_MODE = new SSLMode();
static final ConnectionProperty<String> TENANT = new Tenant();
public static final ConnectionProperty<Integer> MAX_FAILOVER_RETRY = new MaxFailoverRetry();
public static final ConnectionProperty<String> LOAD_BALANCING_POLICY = new LoadBalancingPolicy();
public static final ConnectionProperty<String> DATABASE = new Database();
public static final ConnectionProperty<String> ACCESS_TOKEN = new AccessToken();

Expand Down Expand Up @@ -50,6 +52,7 @@ public final class ConnectionProperties {
.add(WAREHOUSE)
.add(SSL_MODE)
.add(TENANT)
.add(LOAD_BALANCING_POLICY)
.add(DATABASE)
.add(ACCESS_TOKEN)
.add(PRESIGNED_URL_DISABLED)
Expand Down Expand Up @@ -140,6 +143,19 @@ public Tenant() {
}
}

private static class MaxFailoverRetry extends AbstractConnectionProperty<Integer> {
public MaxFailoverRetry() {
super("max_failover_retry", Optional.of("0"), NOT_REQUIRED, ALLOWED, INTEGER_CONVERTER);
}
}

private static class LoadBalancingPolicy
extends AbstractConnectionProperty<String> {
public LoadBalancingPolicy() {
super("load_balancing_policy", Optional.of("disabled"), NOT_REQUIRED, ALLOWED, STRING_CONVERTER);
}
}

private static class AccessToken
extends AbstractConnectionProperty<String> {
public AccessToken() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package com.databend.jdbc;

import java.net.URI;
import java.util.List;
import java.util.Random;

public class DatabendClientLoadBalancingPolicy {
static class DisabledPolicy extends DatabendClientLoadBalancingPolicy {
@Override
public String toString() {
return "Disabled";
}
// do nothing
}
static class RandomPolicy extends DatabendClientLoadBalancingPolicy {
@Override
protected URI pickUri(String query_id, DatabendNodes nodes) {
List<URI> uris = nodes.getUris();

if (uris == null || uris.isEmpty()) {
return null;
}

// Generate a deterministic integer based on the query_id
int deterministicValue = getQueryHash(query_id);

// Use the deterministic value to select a URI
int index = Math.abs(deterministicValue) % uris.size();

return uris.get(index);
}

@Override
public String toString() {
return "Random";
}
}

static class RoundRobinPolicy extends DatabendClientLoadBalancingPolicy {
@Override
protected URI pickUri(String query_id, DatabendNodes nodes) {
List<URI> uris = nodes.getUris();

if (uris == null || uris.isEmpty()) {
return null;
}

// Use round robin to select a URI
int index = nodes.index.getAndUpdate(v -> v + 1 >= uris.size() ? 0 : v + 1);

return uris.get(index);
}

@Override
public String toString() {
return "RoundRobin";
}
}
/**
* Policy that disable load balance and always use the first node.
*/
public static final String DISABLED = "disabled";
/**
* Policy to pick a node randomly from the list of available nodes.
*/
public static final String RANDOM = "random";

/**
* Policy to pick a node using Round Robin Algorithm
*/
public static final String ROUND_ROBIN = "round_robin";


static DatabendClientLoadBalancingPolicy create(String name) {
DatabendClientLoadBalancingPolicy policy;
if (RANDOM.equalsIgnoreCase(name)) {
policy = new RandomPolicy();
} else if (ROUND_ROBIN.equalsIgnoreCase(name)) {
policy = new RoundRobinPolicy();
} else if (DISABLED.equalsIgnoreCase(name)) {
policy = new DisabledPolicy();
} else {
throw new IllegalArgumentException("Unknown load balancing policy: " + name);
}
return policy;
}


/**
* Policy to pick a node based on the least loaded algorithm.
* @param nodes the list of URIs to choose from
* @return the URI to use
*/
protected URI pickUri(String query_id, DatabendNodes nodes) {
if (nodes == null || nodes.getUris() == null || nodes.getUris().isEmpty()) {
return null;
}
return nodes.getUris().get(0);
}


/**
* Get int hash value of given query id
* @param query_id the query id used for choosing load balancing node
* @return hash value of the query id
*/
private static int getQueryHash(String query_id) {
if (query_id.isEmpty()) {
return 0;
}
int hash = 202011; // Using the seed value
for (char c : query_id.toCharArray()) {
hash = hash * 31 + c;
}
return hash;
}

}
Loading

0 comments on commit d63eebb

Please sign in to comment.