问题描述
我一直使用 sql-Loader 在 Oracle 上加载外部文件。 每个平面文件都有以“;”结尾的字段但在同一个文件中,根据第一个值,我有不同的格式。 例如 myfile.dat 是:
package org.example.official.sql
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.example.model.Stock
import org.example.sources.StockSource
import org.scalatest.funsuite.AnyFunSuite
class UpsertKafkaTest extends AnyFunSuite {
val topic = "test-UpsertKafkaTest-1"
//Test Case 1
test("write to upsert kafka: upsert-kafka as sink") {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val ds: DataStream[Stock] = env.addSource(new StockSource(emitInterval = 1500,print = false))
ds.print()
val tenv = StreamTableEnvironment.create(env)
tenv.createTemporaryView("sourceTable",ds)
val ddl =
s"""
CREATE TABLE sinkTable (
id STRING,total_price DOUBLE,PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka','topic' = '$topic','properties.bootstrap.servers' = 'localhost:9092','key.format' = 'json','value.format' = 'json'
)
""".stripMargin(' ')
tenv.executesql(ddl)
tenv.executesql(
"""
insert into sinkTable
select id,sum(price)
from sourceTable
group by id
""".stripMargin(' '))
env.execute()
}
//Test Case 2
test("read from upsert kafka: upsert-kafka as source 2") {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val tenv = StreamTableEnvironment.create(env)
val ddl =
s"""
CREATE TABLE sourceTable (
id STRING,PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka','properties.group.id' = 'testGroup001','key.json.ignore-parse-errors' = 'true','value.format' = 'json','value.json.fail-on-missing-field' = 'false','value.fields-include' = 'EXCEPT_KEY'
)
""".stripMargin(' ')
tenv.executesql(ddl)
val result = tenv.executesql(
"""
select * from sourceTable
""".stripMargin(' '))
result.print()
/*
+----+--------------------------------+--------------------------------+
| op | id | total_price |
+----+--------------------------------+--------------------------------+
| +I | id1 | 1.0 |
| -U | id1 | 1.0 |
| +U | id1 | 3.0 |
| -U | id1 | 3.0 |
| +U | id1 | 6.0 |
| -U | id1 | 6.0 |
| +U | id1 | 10.0 |
| -U | id1 | 10.0 |
| +U | id1 | 15.0 |
| -U | id1 | 15.0 |
| +U | id1 | 21.0 |
| -U | id1 | 21.0 |
| +U | id1 | 28.0 |
| -U | id1 | 28.0 |
| +U | id1 | 36.0 |
| -U | id1 | 36.0 |
| +U | id1 | 45.0 |
*/
}
//Test Case 3
test("read from upsert kafka with consumer console") {
/*
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test-UpsertKafkaTest-1 --from-beginning
{"id":"id1","total_price":1.0}
{"id":"id1","total_price":3.0}
{"id":"id1","total_price":6.0}
{"id":"id1","total_price":10.0}
{"id":"id1","total_price":15.0}
{"id":"id1","total_price":21.0}
{"id":"id1","total_price":28.0}
{"id":"id1","total_price":36.0}
{"id":"id1","total_price":45.0}
*/
}
}
通过使用 sql-Loader,我可以使用两个不同的 *.CTL,如下所示:
...
A;John;brown
A;Maty;Green
B;car;red;ford
...
我可以通过使用外部表来做这样的事情,或者在这种情况下 myfile.dat 的所有记录必须具有相同的格式吗?我需要根据 type_record 的第一个值加载到不同的表中
Oracle 10g 版
提前致谢!
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)