maven依赖
UTF-8
1.8
1.8
1.11.2
1.1.7
1.7.25
org.apache.flink
flink-json
${flink.version}
org.apache.flink
flink-java
${flink.version}
org.apache.flink
flink-streaming-java_2.12
${flink.version}
org.apache.flink
flink-clients_2.12
${flink.version}
org.apache.flink
flink-connector-kafka_2.12
${flink.version}
org.apache.flink
flink-connector-wikiedits_2.12
${flink.version}
org.apache.flink
flink-table-planner_2.12
${flink.version}
org.apache.flink
flink-table-planner-blink_2.12
${flink.version}
org.apache.flink
flink-table-api-java-bridge_2.12
${flink.version}
org.apache.flink
flink-table-api-java
${flink.version}
org.slf4j
slf4j-api
${slf4j.version}
ch.qos.logback
logback-core
${logback.version}
ch.qos.logback
logback-classic
${logback.version}
org.projectlombok
lombok
1.16.18
生产者
import com.g2.flink.models.CustomerStatusChangedEvent;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.table.api.Expressions.$;
//@Slf4j
public class KafkaTableStreamApiProducerTest {
public static void main(String[] args) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
//.useOldPlanner() // flink
.useBlinkPlanner() // blink
.build();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings);
Long baseTimestamp = 1600855709000L;
DataStream eventDataSet = env.fromElements(
new CustomerStatusChangedEvent(1010L, 2, baseTimestamp),
new CustomerStatusChangedEvent(1011L, 2, baseTimestamp + 100),
new CustomerStatusChangedEvent(1011L, 1, baseTimestamp - 100),
new CustomerStatusChangedEvent(1010L, 3, baseTimestamp + 150)
);
String ddl = "CREATE TABLE CustomerStatusChangedEvent(
" +
"customerId int,
" +
"oldStatus int,
" +
"newStatus int,
" +
"eventTime bigint
" +
") WITH(
" +
"‘connector.type‘=‘kafka‘,
" +
"‘connector.version‘=‘universal‘,
" +
"‘connector.properties.bootstrap.servers‘=‘192.168.1.85:9092,192.168.1.86:9092,192.168.12.87:9092‘,
" +
"‘connector.topic‘=‘customer_statusChangedEvent‘,
" +
"‘format.type‘=‘json‘
" +
")
"
;
tableEnvironment.executeSql(ddl);
while (true) {
try {
TimeUnit.SECONDS.sleep(3);
int status = (int) (System.currentTimeMillis() % 3);
String insert = "insert into CustomerStatusChangedEvent(customerId,oldStatus,newStatus,eventTime)" +
"values(1001,1," + status + "," + System.currentTimeMillis() + ")";
tableEnvironment.executeSql(insert);
} catch (Exception ex) {
}
}
}
}
消费者
import com.g2.flink.models.CustomerStatusChangedEvent;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
//@Slf4j
public class KafkaTableStreamApiConsumerTest {
public static void main(String[] args) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
//.useOldPlanner() // flink
.useBlinkPlanner() // blink
.build();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings);
Long baseTimestamp = 1600855709000L;
DataStream eventDataSet = env.fromElements(
new CustomerStatusChangedEvent(1010L, 2, baseTimestamp),
new CustomerStatusChangedEvent(1011L, 2, baseTimestamp + 100),
new CustomerStatusChangedEvent(1011L, 1, baseTimestamp - 100),
new CustomerStatusChangedEvent(1010L, 3, baseTimestamp + 150)
);
String ddl = "CREATE TABLE CustomerStatusChangedEvent(
" +
"customerId int,
" +
"oldStatus int,
" +
"newStatus int,
" +
"eventTime bigint
" +
") WITH(
" +
"‘connector.type‘=‘kafka‘,
" +
"‘connector.version‘=‘universal‘,
" +
"‘connector.properties.group.id‘=‘g2_group‘,
" +
"‘connector.properties.bootstrap.servers‘=‘192.168.1.85:9092,192.168.1.86:9092,192.168.1.87:9092‘,
" +
"‘connector.topic‘=‘customer_statusChangedEvent‘,
" +
"‘connector.startup-mode‘ = ‘latest-offset‘,
" +
"‘format.type‘=‘json‘
" +
")
";
tableEnvironment.executeSql(ddl);
Table resultTb = tableEnvironment.sqlQuery("select customerId,newStatus as status " +
" from CustomerStatusChangedEvent" +
" where newStatus in(1,2)"
);
DataStream result = tableEnvironment.toAppendStream(resultTb, CustomerStatusLog.class);
result.print();
try {
env.execute();
} catch (Exception ex) {
}
}
public static class CustomerStatusLog {
private Long customerId;
private Integer status;
public Long getCustomerId() {
return customerId;
}
public void setCustomerId(Long customerId) {
this.customerId = customerId;
}
public Integer getStatus() {
return status;
}
public void setStatus(Integer newStatus) {
this.status = newStatus;
}
public CustomerStatusLog() {
}
@Override
public String toString() {
return "CustomerStatusLog{" +
"customerId=" + customerId +
", status=" + status +
‘}‘;
}
}
}
消费者打印
4> CustomerStatusLog{customerId=1001, status=2}
4> CustomerStatusLog{customerId=1001, status=1}
4> CustomerStatusLog{customerId=1001, status=2}
4> CustomerStatusLog{customerId=1001, status=2}
flink 使用sql实现kafka生产者和消费者
原文地址:https://www.cnblogs.com/zhshlimi/p/13725081.html