Here’s a complete working example of using Spring WebClient with Flux
to continuously publish and subscribe to a streaming endpoint — a common scenario in event-driven systems or reactive APIs like SSE (Server-Sent Events).
Imagine a service that publishes data (e.g., sensor readings) every few seconds, and another service that subscribes to this stream and processes the data reactively.
@RestController
public class EventPublisherController {
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> publishEvents() {
return Flux.interval(Duration.ofSeconds(1))
.map(i -> "Event #" + i)
.log();
}
}
Flux
@Component
public class EventSubscriber {
private final WebClient webClient;
public EventSubscriber(WebClient.Builder builder) {
this.webClient = builder.baseUrl("http://localhost:8080").build();
}
@PostConstruct
public void subscribeToStream() {
webClient.get()
.uri("/stream")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(String.class)
.subscribe(event -> {
System.out.println("Received: " + event);
});
}
}
@PostConstruct
starts the subscription when the app boots.
@SpringBootApplication
public class StreamingApplication {
public static void main(String[] args) {
SpringApplication.run(StreamingApplication.class, args);
}
}
When the application starts, you’ll see:
Received: Event #0
Received: Event #1
Received: Event #2
...
retry()
and onErrorResume()
to make the subscriber resilient.publishOn(Schedulers.boundedElastic())
for async processing if the handling is heavy.limitRate()
, buffering, etc.) for high-frequency streams.Would you like this example packaged as a GitHub project or with integration testing using MockWebServer
?