Spring WebFlux makes it easy to write a streaming endpoint for your application. One of the options is Server-Sent-Events. Most of the required work is done behind the scenes by Spring Webflux magic, you just need to return a Flux producing the required objects and specify the correct content type. It’s as easy as…

Take one, naive

@RestController
public class StreamingController {
    @Autowired
    private final PersonService personService;

    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<PersonDto> stream() {
        return personSevice.personDtosFlux();
    }
}

We just return a Flux and specify text/event-stream content type as the produced one. Spring Webflux will automatically convert each PersonDto instance to text (by default, it’s JSON).

There you go, you can subscribe to /stream endpoint in your browser, and it will get a stream of PersonDto, each of them serialized as JSON.

The output could look like the following:

data: {"name":"John"}

data: {"name":"Maria"}

You write a ‘sunny day scenario’ test, and everything works as expected.

    @Test
    void streamsCollection() {
        when(personService.personDtosFlux()
                .thenReturn(Flux.just(new PersonDto("John"), new PersonDto("Maria")));
    
        List<PersonDto> dtos = webClient.get().uri("/stream")
                .accept(MediaType.TEXT_EVENT_STREAM)
                .exchange()
                .expectStatus().is2xxSuccessful()
                .expectBodyList(PersonDto.class)
                .returnResult()
                .getResponseBody();
    
        assertThatJohnAndMariaAreReturned(dtos);
    }

But what if an exception occur somewhere at the database level?

You model this situation with .thenReturn(Flux.error(new RuntimeException("Oops))) and… you don’t see anything ‘erroneous’ in your test output! How come?

Well, when a stream channel between a user agent and a server is already established, the response is already committed, so you just cannot change the response code when an exception is encountered inside the reactive channel (represented with a Flux), so you just don’t get any data after the error. More than that, even if Spring Webflux is potentially able to detect just before establishing the channel that the Flux is erroneous, it refuses to do so and just ignores the failure. I don’t know whether it is a bug or it is an intended behavior.

The bottomline is that if we want to somehow signal to the client that some error have happened, we have to use the established channel for this. To do it, we can just return an error message instead of a DTO. To allow the clien1t to distinguish the erroneous case from the normal one, we can use ‘event’ mechanism of the Server-Sent-Events.

To do it, we could employ ServerSentEvent class.

Take two, error handling

    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<Object>> stream() {
        return personSevice.personDtosFlux()
                .map(this::dtoToSse)
                .onErrorResume(e -> Mono.just(throwableToSse(e)));
    }

    private ServerSentEvent<Object> dtoToSse(PersonDto dto) {
        return ServerSentEvent.builder()
                .data(dto)
                .build();
    }

    private ServerSentEvent<Object> throwableToSse(Throwable e) {
        return ServerSentEvent.builder().event("internal-error")
                .data(e.getMessage())
                .build();
    }

Here, we explicitly specify what to return in the case of an error. If it happans, we produce an event called ‘internal-error’.

Here is how the output could look like:

event: internal-error
data: Oops

Conclusion

It has been demonstrated how you can handle errors in a Server-Sent-Events stream in SpringWebFlux.

How do you test such a controller? The following article explains the matter.