问题描述
我正在从mqtt发送方到我在kafka中的代理mqtt消耗数据
mosquitto_pub -h 0.0.0.0 -p 1883 -t temperature -m '{"who":"ben","timeepoc":1558212482,"lat":-33.87052833,"lon":151.21292,"alt":31.0,"batt":0,"speed":12.86}'
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic temperature
后来我用ksql中的表创建流
create stream carsensor (who VARCHAR,batt INTEGER,lon DOUBLE,lat DOUBLE,timeepoc BIGINT,alt INTEGER,speed DOUBLE)
with (kafka_topic = 'temperature',value_format='JSON');
CREATE table runner_status with (value_format='JSON') AS
select who,min(speed) as min_speed,max(speed) as max_speed,min(GEO_disTANCE(lat,lon,-33.87014,151.211945,'km')) as dist_to_finish,count(*) as num_events
from carsensor WINDOW TUMBLING (size 5 minute)
group by who;
这是我的表格数据运行器
{
"ROWTIME": 1597418628366,"ROWKEY": "ben","WINDOWSTART": 1597418400000,"WINDOWEND": 1597418700000,"WHO": "ben","MIN_SPEED": 12.91,"MAX_SPEED": 12.91,"disT_TO_FINISH": 0.07441178137496719,"NUM_EVENTS": 2
}
这是我的桌上型汽车传感器
{
"ROWTIME": 1597418628366,"ROWKEY": "temperature","BATT": 0,"LON": 151.21273,"LAT": -33.87029167,"TIMEEPOC": 1558212492,"ALT": 31,"SPEED": 12.91
}
然后我创建一个用于elasticsearch的连接器
CREATE SINK CONNECTOR SINK_ELASTIC_TEST_02_M WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector','connection.url' = 'http://localhost:9200','key.converter' = 'org.apache.kafka.connect.storage.StringConverter','value.converter'= 'io.confluent.connect.avro.AvroConverter','value.converter.schema.registry.url'= 'http://schema-registry:8081','type.name' = '_doc','topics' = 'carsensor','key.ignore' = 'false','schema.ignore' = 'false','transforms' = 'ExtractTimestamp','transforms.ExtractTimestamp.type' = 'org.apache.kafka.connect.transforms.InsertField$Value','transforms.ExtractTimestamp.timestamp.field' = 'EVENT_TS'
);
但是我可以在kibana上看到任何想要在地图上绘制位置的索引
解决方法
我找到了解决方案jajaj,因为我的数据是我需要使用的json 键忽略和模式忽略,我只能根据温度创建一个表,而不是两个
CREATE SINK CONNECTOR SINK_ELASTIC_TEST_JSON_x WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector','connection.url' = 'http://localhost:9200','key.converter' = 'org.apache.kafka.connect.storage.StringConverter','value.converter' = 'org.apache.kafka.connect.json.JsonConverter','value.converter.schemas.enable' = 'false','type.name' = '_doc','topics' = 'temperature','key.ignore' = 'true','schema.ignore' = 'true'
);