F:\SourceCode\flink-1.3.1 > bin\start-local.bat Starting Flink job manager. Webinterface by default on http://localhost:8081/. Don't close this batch window. Stop job manager by pressing Ctrl+C.
6. 启动Kafka单节点集群
启动Zookeeper:
1 2
cd F:\SourceCode\zookeeper > bin\zkServer.cmd
启动Kafka broker:
1 2 3
> cd F:\SourceCode\kafka_1 > set JMX_PORT=9999 > bin\windows\kafka-server-start.bat F:\\SourceCode\\configs\\server.properties
public class MessageWaterEmitter implements AssignerWithPunctuatedWatermarks<String> { @Nullable @Override public Watermark checkAndGetNextWatermark(String lastElement, long extractedTimestamp) { if (lastElement != null && lastElement.contains(",")) { String[] parts = lastElement.split(","); return new Watermark(Long.parseLong(parts[0])); } return null; }
@Override public long extractTimestamp(String element, long previousElementTimestamp) { if (element != null && element.contains(",")) { String[] parts = element.split(","); return Long.parseLong(parts[0]); } return 0L; } }
public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); // 非常关键,一定要设置启动检查点!! env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("group.id", "flink-group");
FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>(args[0], new SimpleStringSchema(), props); consumer.assignTimestampsAndWatermarks(new MessageWaterEmitter());
/** * Get current free memory size in bytes * @return free RAM size */ public static long currentFreeMemorySizeInBytes() { return mxBean.getFreePhysicalMemorySize(); } }
> bin\flink.bat run -c huxihx.KafkaMessageStreaming F:\\Projects\\flink-kafka-demo\\build\\libs\\flink-kafka-demo-``1.0``-SNAPSHOT.jar test F:\\temp\result.txt