将 PostgreSQL CopyManager copyIn 与 COPY FROM STDIN 一起使用时,它似乎什么都不做

问题描述

我正在尝试按照文档中的建议将 Postgresql CopyManager copyIn 功能copY FROM STDIN 一起使用,以便非常快速地从 InputStream 复制到数据库表中。我正在考虑使用它来连续流式传输要在我接收/处理一个表时写入表的行。然而,下面的快速和肮脏的示例代码似乎卡在 copyIn 上并且没有写入表格。

有人知道我在这里遗漏了什么,或者我的理解有误吗?

import java.sql.*;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import org.postgresql.core.BaseConnection;
import org.postgresql.copy.copyManager;

public class PGConnectTest {

    public static void main(String[] args) {

        try {
                try (Connection connection = DriverManager.getConnection("jdbc:postgresql://XX.XX.XX.XX:9432/somedb","someadmin","somepassword");
                    BaseConnection pgcon = (BaseConnection)connection;
                    PipedInputStream is = new PipedInputStream();
                    BufferedReader br = new BufferedReader(new InputStreamReader(is));
                    PipedOutputStream os = new PipedOutputStream(is);
                    BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(os));) {
                        ExecutorService executorService = Executors.newSingleThreadExecutor();
                        Callable callable = () -> {
                            Thread.sleep(3000);
                            String frmtStr = "%s\t{\"id\":%s,\"somefield\":\"%s\"}\n";
                            String row = null;
                            for(int i=1; i<10; i++) {
                                row = String.format(frmtStr,i,("row"+i));
                                System.out.print(row);
                                bw.write(row);
                            }
                            bw.write("\n");
                            bw.flush();
                            System.out.println("WRITTEN!");
                            return true;
                        };
                        executorService.submit(callable);
                        System.out.println(connection);
                        copyManager copyManager = new copyManager(pgcon);
                        String copysql = "copY dcm.testtbl FROM STDIN";
                        executorService.submit(() -> copyManager.copyIn(copysql,br));
                        Thread.sleep(10000);
                        System.out.println("QUITTING");
                } catch (Exception e) {
                    throw e;
                }
        } catch(Exception ex) {
            System.out.println(ex);
        }

    }

}

testtbl的架构如下,

create table testtbl (
id  integer primary key,jsnclm  jsonb
)

控制台输出是(它不会返回并且需要使用 CTRL+C 来杀死它),

C:\Users\ml410408\Documents\Useful Lookups\POSTGREsql>java -cp ".;postgresql-42.2.18.jar" PGConnectTest
org.postgresql.jdbc.PgConnection@41975e01
1       {"id":1,"somefield":"row1"}
2       {"id":2,"somefield":"row2"}
3       {"id":3,"somefield":"row3"}
4       {"id":4,"somefield":"row4"}
5       {"id":5,"somefield":"row5"}
6       {"id":6,"somefield":"row6"}
7       {"id":7,"somefield":"row7"}
8       {"id":8,"somefield":"row8"}
9       {"id":9,"somefield":"row9"}
WRITTEN!
QUITTING

更新:

一旦我将 copY sql 命令的格式从认 TEXT 更改为 CSV 并传入 csv 记录,它不再卡住但什么也不做(意味着表中没有记录),即使它返回与以前不同。

import java.sql.*;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import org.postgresql.core.BaseConnection;
import org.postgresql.copy.copyManager;

public class PGConnectTest {

    public static void main(String[] args) {

        try {
                try (Connection connection = DriverManager.getConnection("jdbc:postgresql://XX.XX.XX.XX:9432/somedb","somepassword");
                    BaseConnection pgcon = (BaseConnection)connection;
                    PipedInputStream is = new PipedInputStream();
                    BufferedReader br = new BufferedReader(new InputStreamReader(is));
                    PipedOutputStream os = new PipedOutputStream(is);
                    BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(os));) {
                        ExecutorService executorService = Executors.newSingleThreadExecutor();
                        Callable callable = () -> {
                            Thread.sleep(3000);
                            String frmtStr = "%s,'{\"id\":%s,\"somefield\":\"%s\"}'\n";
                            String row = null;
                            for(int i=1; i<10; i++) {
                                row = String.format(frmtStr,("row"+i));
                                System.out.print(row);
                                bw.write(row);
                            }
                            bw.write("\n");
                            bw.write("'\\.'\n");
                            System.out.println("'\\.'\n");
                            bw.flush();
                            os.flush();
                            System.out.println("WRITTEN!");
                            return true;
                        };
                        executorService.submit(callable);
                        System.out.println(connection);
                        copyManager copyManager = new copyManager(pgcon);
                        String copysql = "copY dcm.testtbl FROM STDIN FORMAT CSV DELIMITER ','";
                        executorService.submit(() -> copyManager.copyIn(copysql,br));
                        Thread.sleep(5000);
                        System.out.println(br.ready());
                        while (br.ready()) {
                            System.out.println("LINE : " + br.readLine());
                        }
                        executorService.shutdown();
                        System.out.println("QUITTING");
                } catch (Exception e) {
                    throw e;
                }
                System.out.println("QUITTING FINALLY");
        } catch(Exception ex) {
            System.out.println(ex);
        }

    }

}

谢谢

解决方法

那里似乎有几个不同的问题。

  • 程序挂起是因为 ExecutorService 中的线程让它保持活动状态;提交任务后调用 shutdown() 会导致它按预期终止。
  • 没有写入任何内容的主要原因是 copyIn() 抛出异常:流中的尾随换行符 (bw.write("\n")) 触发了 ERROR: invalid input syntax for integer: "",因为它无法找到 {{1 }} 列。

即便如此,由于资源清理的时间安排,看起来这仍然会受到一些竞争条件的影响。 id 调用将阻塞,直到到达其 copyIn() 的末尾,在 InputStream 的情况下,“结束”是 PipedInputStream 关闭的点.但是在关闭流并解除 PipedOutputStream 调用的阻塞后,输入流和数据库连接将快速连续关闭,可能在副本有机会完成之前。充其量,它似乎成功提交到表,但随后出现“取消复制操作时数据库连接失败”的错误。

要确保这些资源在仍在使用时不会被释放:

  • 等待作者完成
  • 关闭 copyIn()
  • 等待复印机完成
  • 关闭 OutputStream / InputStream

等待任务完成具有将任何异常传播到主线程的额外好处。

由于 Connection 还有一个潜在的死锁:如果写入器线程填满了管道的缓冲区,它将阻塞直到读取器开始使用数据,如果它们按顺序执行,则永远不会发生这种情况。使用 newSingleThreadExecutor() 应该可以解决这个问题。

考虑到所有这些:

newFixedThreadPool(2)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...