1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| package flink.demo.time
import java.time.Duration
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy} import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector
object FlinkTimeJob {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1)
env.enableCheckpointing(600 * 1000, CheckpointingMode.EXACTLY_ONCE) env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) env.getCheckpointConfig.setCheckpointTimeout(60000) env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) env.getCheckpointConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.getConfig.setAutoWatermarkInterval(2000)
env.socketTextStream("localhost", 9192, '\n') .map(str => { val keyAndTimestamp = str.split(",") Tuple2(keyAndTimestamp(0), keyAndTimestamp(1).toLong) }) .assignTimestampsAndWatermarks( WatermarkStrategy.forGenerator[(String, Long)](_ => new BoundedOutOfOrdernessWatermarksWithLog(Duration.ofSeconds(10))) .withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] { override def extractTimestamp(element: (String, Long), recordTimestamp: Long): Long = element._2 }) ) .keyBy(_._1) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply((key, window, iterable, collector: Collector[String]) => { val start = window.getStart val end = window.getEnd val windowElementConcat = iterable.mkString(",") val res = s"key: $key, Window: [$start, $end), [${DateTimeUtil.toDateTime(start)}, ${DateTimeUtil.toDateTime(end)}), window elements: $windowElementConcat" collector.collect(res) }) .print()
env.execute("flink demo") } }
|