

Thus, sending more record is the only way how "stream-time" can be advance. "stream-time" is computed as a function over the record timestamps and thus, if you no records are processed, "stream-time" would advance and suppress() would never emit anything. Properties.put(ProducerConfig.ACKS_CONFIG, "all") Īs you are using suppress() (with untilWindowCloses config) the operator will only emit a final result if "stream-time" advances. Properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ()) Props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class) Īnd in the producer side: properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 127.0.0.1:9092) Props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass())

Props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Bytes().getClass()) Props.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.getAppId()+Constants.APP_ID) The properties used in the Kafka consumed are: props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 127.0.0.1:9092) It isn't clear for me why I must send 3 records (if I send 2.5.0 I have saw that I must send 3 records to process the last window.It doesn't matter if I should wait time or whatever. Is there any way to auto process the last window? When the producer sends the last record (the 120th jSon) the producer won't send more records anymore.I have the expected functionality, I mean, it receives all the records but to receive the last windowed messages I must to send manually records. Steam Community :: Guide :: Tomb Raider II: Golden Mask/Tomb Raider II. suppress(Suppressed.untilWindowCloses(())) This::processNewRecord, //new TransactionAggregator(), groupBy(this::groupedByTimeStampAndProtocolName) The code: final KStream transactions = builder.stream(kafkaProperties.getTopic(), Consumed.with(Serdes.String(), aggregateSerde)) I have a consumer that consumes all the jSons sent by the producer.

I have a Kafka producer that is reading lines from a file (every line is a different jSon) every read line is send to Kafka with a difference of 500 ms time period. At the moment I am doing it manually, but first of all, a little description. Like I say in the title I want to receive the last windowedBy messages when the producer stops to send menssages.
