授权 | 开源 |
大小 | 40.11MB |
语言 | Java |
ksqlDB允许您在流和表上定义实例化视图。物化视图由所谓的“持久查询”定义。这些查询被称为持久查询,因为它们使用表维护其增量更新的结果。
CREATE TABLE hourly_metrics AS
SELECT url, COUNT(*)
FROM page_views
WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY url EMIT CHANGES;
可以通过查询从需求的实现视图中“拉出”结果SELECT。以下查询将返回单行:SELECT * FROM hourly_metrics
WHERE URL = 'http://myurl.com' AND WINDOWSTART = '2019-11-20T19:00' ;
结果也可以通过流查询连续地“推送”给客户端SELECT。以下流查询将向客户端推送对实例化视图所做的所有增量更改:SELECT * FROM hourly_metrics EMIT CHANGES ;
流查询将永久运行,直到明确终止它们为止。CREATE STREAM vip_actions AS
SELECT userid, page, action
FROM clickstream c
LEFT JOIN users u ON c.userid = u.user_id
WHERE u.level = 'Platinum' EMIT CHANGES;
ksqlDB非常适合识别实时数据的模式或异常。通过在数据到达时处理流,您可以识别并适当地以毫秒级延迟掩盖异常事件。
CREATE TABLE possible_fraud AS
SELECT card_number, count(*)
FROM authorization_attempts
WINDOW TUMBLING (SIZE 5 SECONDS)
GROUP BY card_number
HAVING count(*) > 3 EMIT CHANGES;
CREATE TABLE error_counts AS
SELECT error_code, count(*)
FROM monitoring_stream
WINDOW TUMBLING (SIZE 1 MINUTE)
WHERE type = 'ERROR'
GROUP BY error_code EMIT CHANGES;
CREATE STREAM clicks_transformed AS
SELECT userid, page, action
FROM clickstream c
LEFT JOIN users u ON c.userid = u.user_id EMIT CHANGES;
与其简单地将所有连续查询输出发送到Kafka主题,还不如将输出路由到另一个数据存储区非常有用。ksqlDB的Kafka Connect集成使此模式非常容易。 CREATE SINK CONNECTOR es_sink WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'topics' = 'clicks_transformed',
'key.ignore' = 'true',
'schema.ignore' = 'true',
'type.name' = '',
'connection.url' = 'http://elasticsearch:9200');
相关软件