问题描述
我正在尝试使用 logstash 将数据从 oracle 数据库加载到弹性搜索。使用插件 logstash-filter-aggregate (2.8.0) 进行多个聚合。我使用 plugin-jdbc 从关系数据库中提取数据。 一位教师拥有多个 contact_details 的地方。
我想要这样的结果:
"teachers" : [
{
"tch_name" : "aaa","social_cat" : "art","tch_id" : 201,"contact_details" : [
{
"phone_no": ["1111111111","2222222222"],"email_id: [[email protected],[email protected]]
}
]
}
]
create table mst_school (sch_id integer primary key,udise_sch_code varchar2(50),school_name varchar2(50),update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL);
insert into mst_school (sch_id,udise_sch_code,school_name) values(1,'100','AESA');
insert into mst_school (sch_id,school_name) values(2,'200','PVP');
create table teacher_profile (teacher_id integer primary key,name varchar2(50),social_category varchar2(50),sch_id integer references mst_school(sch_id),startdate TIMESTAMP,update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL);
insert into teacher_profile (teacher_id,name,social_category,sch_id,startdate) values(201,'aaa','art',1,TO_DATE('2020-07-01-00:00','yyyy-MM-dd-hh24:mi')); --yyyy-MM-dd-HH:mm
insert into teacher_profile (teacher_id,startdate) values(202,'bbb','math','yyyy-MM-dd-hh24:mi'));
insert into teacher_profile (teacher_id,startdate) values(203,'ccc','phy',2,TO_DATE('2020-10-22-00:00','yyyy-MM-dd-hh24:mi'));
create table village (village_id integer primary key,village_name varchar2(50),latitude number(12,6),longitude number(12,update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL);
insert into village (village_id,village_name,latitude,longitude) values(500,'Pune',18.5135,73.7699);
insert into village (village_id,longitude) values(600,'Mumbai',19.0760,72.8777);
create table contact_info (teacher_ids integer REFERENCES teacher_profile(teacher_id),phone_no varchar2(20),email_id varchar2(20),update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL);
insert into contact_info (teacher_ids,phone_no,email_id) values (201,'1111111111','[email protected]');
insert into contact_info (teacher_ids,'2222222222','[email protected]');
insert into contact_info (teacher_ids,email_id) values (202,'3333333333','[email protected]');
insert into contact_info (teacher_ids,email_id) values (203,'4444444444','[email protected]');
select concat(s.udise_sch_code,tch.teacher_id) comp_id,s.sch_id as sch_id,s.udise_sch_code as sch_code,s.school_name as school_name,v.latitude as latitude,v.longitude as longitude,tch.teacher_id as tch_id,tch.name as tch_name,tch.social_category as social_cat,tch.startdate as startdate,c.phone_no as phone_no,v.village_id as village_id,v.village_name as village_name,v.sch_id as vsch_id
from mst_school s
LEFT JOIN teacher_profile tch on s.sch_id = tch.sch_id
LEFT JOIN village v on s.sch_id = v.sch_id
LEFT JOIN contact_info c on tch.teacher_id = c.teacher_ids;
.conf 文件 想要修改配置文件,以便输出将包含一组联系人详细信息。
input {
jdbc {
jdbc_connection_string => "jdbc:oracle:thin:@****:1521/DB19C"
jdbc_driver_library => "/home/user/Downloads/ojdbc8-19.3.0.0.jar"
jdbc_user => "****"
jdbc_password => "****"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
schedule => "* * * * *"
statement => "select concat(s.udise_sch_code,to_char(tch.startdate,'yyyy-MM-dd-HH:mm') as startdate,c.email_id as email_id,v.sch_id as vsch_id
from mst_school s
LEFT JOIN teacher_profile tch on s.sch_id = tch.sch_id
LEFT JOIN village v on s.sch_id = v.sch_id
LEFT JOIN contact_info c on tch.teacher_id = c.teacher_ids"
tracking_column_type => "numeric"
jdbc_paging_enabled => true
jdbc_fetch_size => "500"
charset => "UTF-8"
codec => json
tracking_column => sch_id
last_run_Metadata_path => "/home/user/Desktop/.sch_id_tracker_file"
}
}
filter {
aggregate {
task_id => "%{sch_id}"
code => "
map['comp_id'] = event.get('comp_id')
map['sch_id'] = event.get('sch_id')
map['sch_code'] = event.get('sch_code')
map['teachers'] ||= []
map['teachers'] << {
'tch_id' => event.get('tch_id'),'tch_name' => event.get('tch_name'),'social_cat' => event.get('social_cat')
# Contact details
}
event.cancel()
"
timeout_tags => ["aggregate"]
push_prevIoUs_map_as_event => true
timeout => 3
}
}
output {
elasticsearch {
document_id => "%{sch_id}"
index => "school_index"
}
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)