Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I'm setting up a slow-changing lookup Map in my Apache-Beam pipeline. It continuously updates the lookup map. For each key in lookup map, I retrieve the latest value in the global window with accumulating mode. But it always meets Exception :

org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalArgumentException: Duplicate values for mykey

Is anything wrong with this snippet code?

If I use .discardingFiredPanes() instead, I will lose information in the last emit.

pipeline
  .apply(GenerateSequence.from(0).withRate(1, Duration.standardMinutes(1L)))
  .apply(
      Window.<Long>into(new GlobalWindows())
         .triggering(Repeatedly.forever(
             AfterProcessingTime.pastFirstElementInPane()))
         .accumulatingFiredPanes())
  .apply(new ReadSlowChangingTable())
  .apply(Latest.perKey())
  .apply(View.asMap());

Example Input Trigger:

t1 : KV<k1,v1> KV< k2,v2>
t2 : KV<k1,v1>

accumulatingFiredPanes => expected result at t2 => KV(k1,v1), KV(k2,v2) but failed due to duplicated exception

discardingFiredPanes => expected result at t2 => KV(k1,v1) Success

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
1.0k views
Welcome To Ask or Share your Answers For Others

1 Answer

Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share

548k questions

547k answers

4 comments

86.3k users

...