Skip to content

Commit

Permalink
opt: client supports requests to server-streaming endpoints without p…
Browse files Browse the repository at this point in the history
…arameters (#15029)

* optimize:  support http1 automatic keepalive setting

* optimize:  support http1 automatic keepalive setting

* optimize:  support http1 automatic keepalive setting

* optimize:  support http1 automatic keepalive setting

* feat:  server stream supports requests without parameters

* feat:  server stream supports requests without parameters

* fix ut

* fix ut

* fix ut

* fix ut

* update

* update

* update
  • Loading branch information
funky-eyes authored Jan 6, 2025
1 parent 8e49668 commit 82274d9
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public interface MethodDescriptor {

Object getAttribute(String key);

Class<?>[] getActualRequestTypes();

Class<?> getActualResponseType();

enum RpcType {
UNARY,
CLIENT_STREAM,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,17 @@ private RpcType determineRpcType() {
boolean returnIsVoid = returnClass.getName().equals(void.class.getName());
if (returnIsVoid && parameterClasses.length == 1 && isStreamType(parameterClasses[0])) {
actualRequestTypes = Collections.emptyList().toArray(new Class<?>[0]);
actualResponseType = obtainActualTypeInStreamObserver(
((ParameterizedType) method.getGenericParameterTypes()[0]).getActualTypeArguments()[0]);
return RpcType.SERVER_STREAM;
}
if (returnIsVoid
&& parameterClasses.length == 2
&& !isStreamType(parameterClasses[0])
&& isStreamType(parameterClasses[1])) {
actualRequestTypes = parameterClasses;
actualResponseType = obtainActualTypeInStreamObserver(
((ParameterizedType) method.getGenericParameterTypes()[1]).getActualTypeArguments()[0]);
return RpcType.SERVER_STREAM;
}
if (Arrays.stream(parameterClasses).anyMatch(this::isStreamType) || isStreamType(returnClass)) {
Expand All @@ -119,13 +124,6 @@ private boolean isStreamType(Class<?> classType) {
return StreamObserver.class.isAssignableFrom(classType);
}

private static Class<?> obtainActualTypeInStreamObserver(Type typeInStreamObserver) {
return (Class<?>)
(typeInStreamObserver instanceof ParameterizedType
? ((ParameterizedType) typeInStreamObserver).getRawType()
: typeInStreamObserver);
}

@Override
public String getMethodName() {
return methodName;
Expand Down Expand Up @@ -179,14 +177,23 @@ public Object getAttribute(String key) {
return this.attributeMap.get(key);
}

@Override
public Class<?>[] getActualRequestTypes() {
return actualRequestTypes;
}

@Override
public Class<?> getActualResponseType() {
return actualResponseType;
}

private Class<?> obtainActualTypeInStreamObserver(Type typeInStreamObserver) {
return (Class<?>)
(typeInStreamObserver instanceof ParameterizedType
? ((ParameterizedType) typeInStreamObserver).getRawType()
: typeInStreamObserver);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,16 @@ public Object getAttribute(String key) {
return this.attributeMap.get(key);
}

@Override
public Class<?>[] getActualRequestTypes() {
return this.parameterClasses;
}

@Override
public Class<?> getActualResponseType() {
return this.returnClass;
}

@Override
public Pack getRequestPack() {
return requestPack;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,9 @@ public ReflectionPackableMethod(
switch (method.getRpcType()) {
case CLIENT_STREAM:
case BI_STREAM:
actualRequestTypes = new Class<?>[] {
obtainActualTypeInStreamObserver(
((ParameterizedType) method.getMethod().getGenericReturnType()).getActualTypeArguments()[0])
};
actualResponseType = obtainActualTypeInStreamObserver(
((ParameterizedType) method.getMethod().getGenericParameterTypes()[0])
.getActualTypeArguments()[0]);
break;
case SERVER_STREAM:
actualRequestTypes = method.getMethod().getParameterTypes();
actualResponseType = obtainActualTypeInStreamObserver(
((ParameterizedType) method.getMethod().getGenericParameterTypes()[1])
.getActualTypeArguments()[0]);
actualRequestTypes = method.getActualRequestTypes();
actualResponseType = method.getActualResponseType();
break;
case UNARY:
actualRequestTypes = method.getParameterClasses();
Expand Down Expand Up @@ -411,6 +401,9 @@ public byte[] pack(Object obj) throws IOException {
for (String type : argumentsType) {
builder.addArgTypes(type);
}
if (actualRequestTypes == null || actualRequestTypes.length == 0) {
return builder.build().toByteArray();
}
ByteArrayOutputStream bos = new ByteArrayOutputStream();
for (int i = 0; i < arguments.length; i++) {
Object argument = arguments[i];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,20 @@ private static boolean isSync(MethodDescriptor methodDescriptor, Invocation invo

AsyncRpcResult invokeServerStream(MethodDescriptor methodDescriptor, Invocation invocation, ClientCall call) {
RequestMetadata request = createRequest(methodDescriptor, invocation, null);
StreamObserver<Object> responseObserver =
(StreamObserver<Object>) invocation.getArguments()[1];
final StreamObserver<Object> requestObserver = streamCall(call, request, responseObserver);
requestObserver.onNext(invocation.getArguments()[0]);
Object[] arguments = invocation.getArguments();
final StreamObserver<Object> requestObserver;
if (arguments.length == 2) {
StreamObserver<Object> responseObserver = (StreamObserver<Object>) arguments[1];
requestObserver = streamCall(call, request, responseObserver);
requestObserver.onNext(invocation.getArguments()[0]);
} else if (arguments.length == 1) {
StreamObserver<Object> responseObserver = (StreamObserver<Object>) arguments[0];
requestObserver = streamCall(call, request, responseObserver);
requestObserver.onNext(null);
} else {
throw new IllegalStateException(
"The first parameter must be a StreamObserver when there are no parameters, or the second parameter must be a StreamObserver when there are parameters");
}
requestObserver.onCompleted();
return new AsyncRpcResult(CompletableFuture.completedFuture(new AppResponse()), invocation);
}
Expand Down

0 comments on commit 82274d9

Please sign in to comment.