I want to write an endpoint which always shows the newest messages of a redis stream (reactive).
The entities look like this {'key' : 'some_key', 'status' : 'some_string'}
.
So I would like to have the following result:
- Page is called, content would be for instance displaying an entity:
{'key' : 'abc', 'status' : 'status_A'}
the page is not closed
- Then a new entity is added to the stream
XADD mystream * key abc status statusB
- Now I would prefer to see each item of the Stream, without updating the Tab
{'key' : 'abc', 'status' : 'status_A'}
{'key' : 'abc', 'status' : 'status_B'}
When I try to mock this behavior it works and I get the expected output.
@GetMapping(value="/light/live/mock", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<Light> liveLightMock() {
List<Light> test = Arrays.asList(new Light("key", "on") , new Light("key", "off"),
new Light("key", "on") , new Light("key", "off"),
new Light("key", "on") , new Light("key", "off"),
new Light("key", "on") , new Light("key", "off"),
new Light("key", "on") , new Light("key", "off"));
return Flux.fromIterable(test).delayElements(Duration.ofMillis(500));
}
The individual elements of the list are displayed one after another with a 500ms Delay between items.
However, when I try to access Redis instead of the mocked variant, it no longer works. I try to test the partial functions successively. So that my idea works first the save (1) function must work, if the save function works, displaying old records without reactiv features must work (2) and last but not least if both work i kinda need to get the reactiv part going.
Maybe you guys can help me get the Reactive Part Working. Im working on it for days without getting any improvements.
Ty guys :)
Test 1) - Saving Function (Short Version)
looks like its working.
@GetMapping(value="/light/create", produces = MediaType.APPLICATION_JSON_VALUE)
@ResponseBody
public Flux<Light> createTestLight() {
String status = (++statusIdx % 2 == 0) ? "on" : "off";
Light light = new Light(Consts.LIGHT_ID, status);
return LightRepository.save(light).flux();
}
@Override
public Mono<Light> save(Light light) {
Map<String, String> lightMap = new HashMap<>();
lightMap.put("key", light.getKey());
lightMap.put("status", light.getStatus());
return operations.opsForStream(redisSerializationContext)
.add("mystream", lightMap)
.map(__ -> light);
}
Test 2) - Loading/Reading Function (Short Version)
seems to be working, but not reaktiv -> i add a new entity while a WebView was Open, the View showed all Items but didnt Updated once i added new items. after reloading i saw every item
How can i get getLights
to return something that is working with TEXT_EVENT_STREAM_VALUE
which subscribes to the stream?
@Override
public Flux<Object> getLights() {
ReadOffset readOffset = ReadOffset.from("0");
StreamOffset<String> offset = StreamOffset.fromStart("mystream"); //fromStart or Latest
Function<? super MapRecord<String, Object, Object>, ? extends Publisher<?>> mapFunc = entries -> {
Map<Object, Object> kvp = entries.getValue();
String key = (String) kvp.get("key");
String status = (String) kvp.get("status");
Light light = new Light(key, status);
return Flux.just(light);
};
return operations.opsForStream()
.read(offset)
.flatMap(mapFunc);
}
@GetMapping(value="/light/live", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<Object> lightLive() {
return LightRepository.getLights();
}
Test 1) - Saving Function (Long Version)
The Endpoint & Saving Functions are part of Diffrent Classes.
String status = (++statusIdx % 2 == 0) ? "on" : "off";
flip flops the status from on to off, to on, to off, ...
@GetMapping(value="/light/create", produces = MediaType.APPLICATION_JSON_VALUE)
@ResponseBody
public Flux<Light> createTestLight() {
String status = (++statusIdx % 2 == 0) ? "on" : "off";
Light light = new Light(Consts.LIGHT_ID, status);
return LightRepository.save(light).flux();
}
@Override
public Mono<Light> save(Light light) {
Map<String, String> lightMap = new HashMap<>();
lightMap.put("key", light.getKey());
lightMap.put("status", light.getStatus());
return operations.opsForStream(redisSerializationContext)
.add("mystream", lightMap)
.map(__ -> light);
}
To Validate the Functions i
- Delted the Stream, to Empty it
127.0.0.1:6379> del mystream
(integer) 1
127.0.0.1:6379> XLEN myStream
(integer) 0
Called the Creation Endpoint twice /light/create
i expected the Stream now to have two Items, on with status = on, and one with off
127.0.0.1:6379> XLEN mystream
(integer) 2
127.0.0.1:6379> xread STREAMS mystream 0-0
1) 1) "mystream"
2) 1) 1) "1610456865517-0"
2) 1) "key"
2) "light_1"
3) "status"
4) "off"
2) 1) "1610456866708-0"
2) 1) "key"
2) "light_1"
3) "status"
4) "on"
It looks like the Saving part is Working.
Test 2) - Loading/Reading Function (Long Version)
seems to be working, but not reaktiv -> i add a new entity and the page updates its values
@Override
public Flux<Object> getLights() {
ReadOffset readOffset = ReadOffset.from("0");
StreamOffset<String> offset = StreamOffset.fromStart("mystream"); //fromStart or Latest
Function<? super MapRecord<String, Object, Object>, ? extends Publisher<?>> mapFunc = entries -> {
Map<Object, Object> kvp = entries.getValue();
String key = (String) kvp.get("key");
String status = (String) kvp.get("status");
Light light = new Light(key, status);
return Flux.just(light);
};
return operations.opsForStream()
.read(offset)
.flatMap(mapFunc);
}
@GetMapping(value="/light/live", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<Object> lightLive() {
return LightRepository.getLights();
}
- Calling
/light/live
-> i should haveN
entries -> if I can see Entries, the normal Display is Working (non Reactive) - Calling
/light/create
twice -> the live Few should have added 2 Entries ->N+2
Entries - Waiting 1 Minute just to be Safe
- The View Should Show
N+2
Entries for the Reactiv Part to be working - Refresh View from 1 (
/light/live
), should still show the same amount if Reactiv Works
Displaying the Information works (1), the Adding part of (2) worked, checked per Terminal, 4) didnt work
ergo the Display is working, but its not reactive
after i refreshed the Browser (5) i got the expected N+2
entries - so (2) worked aswell