上一篇文章中已经介绍了服务器端如何改造以支持HTTP2.0 + protobuf,并且给了一个客户端实现的例子,但这个例子并没有与SpringBoot结合。比如能否让RestTemplate或WebClient支持HTTP2.0 + protobuf,下面就给出代码:
1、RestTemplate
public static void doWithRestTemplate(ReqtObj reqt) {
String url = "http://127.0.0.1:8080/object/doTest";
RestTemplate restTemplate = new RestTemplate();
restTemplate.getMessageConverters().add(new ProtoBufHttpMessageConverter());
HttpHeaders httpHeaders = new HttpHeaders();
// httpHeaders.add("Content-Type", "application/json"); // #1
// httpHeaders.add("Accept", "application/json"); // #2
httpHeaders.add("Content-Type", "application/protobuf"); // #3
httpHeaders.add("Accept", "application/protobuf"); // #4
HttpEntity<ReqtObj> httpEntity = new HttpEntity<>(reqt, httpHeaders);
ResponseEntity<RespObj> respEntity = restTemplate.exchange(url,
HttpMethod.POST, httpEntity, RespObj.class);
RespObj reqtObj = respEntity.getBody();
System.out.println("++++++++++++++++++++" + reqtObj.toString());
}
上面的代码使RestTemplate能够支持protobuf格式的报文,但由于RestTemplate先天的局限性,只能支持HTTP/1.1,不能支持HTTP/2.0。
另外,将#3#4注释掉后,上面的代码就可以直接变回采用JSON报文了。
2、WebClient
package com.cebbank;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.reactivestreams.Publisher;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.http.codec.HttpMessageWriter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class ProtoBufMessageWriter implements HttpMessageWriter<Object> {
private static final List<MediaType> MEDIA_TYPES = Collections
.unmodifiableList(Arrays.asList(new MediaType("application", "protobuf")));
@Override
public List<MediaType> getWritableMediaTypes() {
return MEDIA_TYPES;
}
@Override
public boolean canWrite(ResolvableType elementType, MediaType mediaType) {
return true;
}
@Override
public Mono<Void> write(Publisher<? extends Object> inputStream, ResolvableType elementType, MediaType mediaType,
ReactiveHttpOutputMessage message, Map<String, Object> hints) {
Flux<DataBuffer> dataBufferFlux = Flux.from(inputStream).map(value -> {
byte[] data = ProtoBufTools.serialize(value);
DataBufferFactory bufferFactory = message.bufferFactory();
DataBuffer buffer = bufferFactory.allocateBuffer(data.length);
buffer.write(data);
return buffer;
});
return message.writeWith(dataBufferFlux);
}
}
package com.cebbank;
import java.io.ByteArrayOutputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpInputMessage;
import org.springframework.http.codec.HttpMessageReader;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class ProtoBufMessageReader implements HttpMessageReader<Object> {
private static final List<MediaType> MEDIA_TYPES = Collections
.unmodifiableList(Arrays.asList(new MediaType("application", "protobuf")));
@Override
public List<MediaType> getReadableMediaTypes() {
return MEDIA_TYPES;
}
@Override
public boolean canRead(ResolvableType elementType, MediaType mediaType) {
return true;
}
@Override
public Flux<Object> read(ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) {
Mono<Object> mono = createObject(message);
return mono.flux();
}
@Override
public Mono<Object> readMono(ResolvableType elementType, ReactiveHttpInputMessage message,
Map<String, Object> hints) {
Mono<Object> mono = createObject(message);
return mono;
}
private Mono<Object> createObject(ReactiveHttpInputMessage message) {
Mono<Object> mono = message.getBody().collectList().map(list -> {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
for (DataBuffer dataBuffer : list) {
int len = dataBuffer.readableByteCount();
byte[] buffer = new byte[len];
dataBuffer.read(buffer);
bos.write(buffer);
}
byte[] data = bos.toByteArray();
return ProtoBufTools.deserialize(data, RespObj.class);
} catch (Exception e) {
e.printStackTrace();
}
return null;
});
return mono;
}
}
public static void doWithWebClientUseReactor(ReqtObj reqt) {
String url = "http://127.0.0.1:8080/object/doTest";
try {
HttpClient httpClient = HttpClient.create().protocol(HttpProtocol.H2C);
ReactorClientHttpConnector reactorClientHttpConnector = new ReactorClientHttpConnector(httpClient);
WebClient webClient = WebClient.builder()//
.clientConnector(reactorClientHttpConnector)//
.codecs(configurer -> {
configurer.customCodecs().register(new ProtoBufMessageReader());
configurer.customCodecs().register(new ProtoBufMessageWriter());
})//
.build();
System.out.println("======================" + reqt.toString());
System.out.println("reqtLen=" + data.length);
MediaType mediaType = new MediaType("application", "protobuf");
// MediaType mediaType = new MediaType("application", "json");
Mono<RespObj> RespObjMono = webClient.post().uri(url).contentType(mediaType).accept(mediaType)
.bodyValue(reqt).retrieve().bodyToMono(RespObj.class);
RespObj resp = RespObjMono.block();
System.out.println("======================" + resp.toString());
} catch (Throwable e) {
e.printStackTrace();
}
}
通过实现ProtoBufMessageReader和ProtoBufMessageWriter,就可以让WebClient支持HTTP2.0 + protobuf了。