在服务器上加载CSV数据,将数据转换为JSON并使用Golang使用Json查询获得结果

问题描述

我正在尝试构建一个TCP服务器,该服务器从 CSV 文件加载数据集并提供查询数据集的接口。 TCP服务器将公开端口4040。CSV文件包含与日冕病毒案例相关的以下列:

  • 累积测试阳性
  • 进行的累积测试
  • 日期
  • 出院
  • 已过期
  • 已入学
  • 地区 在基于Linux / Unix的系统上,用户应该能够使用 NetCat nc localhost 4040命令连接到服务器。

连接到TCP后,用户应该能够通过发送JSON格式的查询来与应用程序进行通信。

{
    "query": {
        "region": "Sindh"
    }
}
{
    "query": {
        "date": "2020-03-20"
    }
}

我的server.go

package main

import (
    "fmt"
    "net"
    "os"
    "flag"
    "log"
    "encoding/csv"
    "encoding/json"
    "bufio"
    "io"
    "strings"
)

type CovidPatient struct {
    Positive    string      `json:"Covid_Positive"`
    Performed   string      `json:"Coivd_Performed"`
    Date        string      `json:"Covid_Date"`
    discharged  string      `json:"Covid_discharged"`
    Expired     string      `json:"Covid_Expired"`
    Region      string      `json:"Covid_Region"`
    Admitted    string      `json:"Covid_Admitted"`
}

type DataRequest struct {   
    Get string `json:"get"`
}

type DataError struct {     
    Error string `json:"Covid_error"`
}

func Load(path string) []CovidPatient {
    table := make([]CovidPatient,0)
    var patient CovidPatient
    file,err := os.Open(path)
    if err != nil {
        panic(err.Error())
    }
    defer file.Close()

    reader := csv.NewReader(file)
    csvData,err := reader.ReadAll()
    if err != nil {
        fmt.Println(err)
        os.Exit(1)
    }
    for _,row := range csvData{
        patient.Positive =  row[0]
        patient.Performed =  row[1]
        patient.Date =       row[2]
        patient.discharged = row[3]
        patient.Expired =    row[4]
        patient.Region =     row[5]
        patient.Admitted =   row[6]
        table = append(table,patient)
    }
    return table
}

func Find(table []CovidPatient,filter string) []CovidPatient {
    if filter == "" || filter == "*" {
        return table
    }
    result := make([]CovidPatient,0)
    filter = strings.toupper(filter)
    for _,cp := range table {
        if cp.Date == filter ||
            cp.Region == filter ||
            strings.Contains(strings.toupper(cp.Positive),filter)      ||
            strings.Contains(strings.toupper(cp.Performed),filter)     ||
            strings.Contains(strings.toupper(cp.Date),filter)          ||
            strings.Contains(strings.toupper(cp.discharged),filter)    ||
            strings.Contains(strings.toupper(cp.Expired),filter)       ||
            strings.Contains(strings.toupper(cp.Region),filter)        ||
            strings.Contains(strings.toupper(cp.Admitted),filter){
            result = append(result,cp)
        }
    }
    return result
}

var (
    patientsDetail = Load("./covid_final_data.csv")
)

func main(){
    var addr string
    var network string
    flag.StringVar(&addr,"e",":4040","service endpoint [ip addr or socket path]")
    flag.StringVar(&network,"n","tcp","network protocol [tcp,unix]")
    flag.Parse()

    switch network {
    case "tcp","tcp4","tcp6","unix":
    default:
        fmt.Println("unsupported network protocol")
        os.Exit(1)
    }

    ln,err := net.Listen(network,addr)
    if err != nil {
        log.Println(err)
        os.Exit(1)
    }
    defer ln.Close()
    log.Println("Covid19 Condition in Pakistan")
    log.Printf("Service started: (%s) %s\n",network,addr)
    
    for {
        conn,err := ln.Accept()
        if err != nil {
            log.Println(err)
            conn.Close()
            continue
        }
        log.Println("Connected to ",conn.RemoteAddr())
        go handleConnection(conn)
    }
}
func handleConnection(conn net.Conn) {
    defer func() {
        if err := conn.Close(); err != nil {
            log.Println("error closing connection:",err)
        }
    }()

    reader := bufio.NewReaderSize(conn,4)

    for {
        buf,err := reader.ReadSlice('}')
        if err != nil {
            if err != io.EOF {
                log.Println("connection read error:",err)
                return
            }
        }
        reader.Reset(conn)
        
        var req DataRequest
        if err := json.Unmarshal(buf,&req); err != nil {
            log.Println("Failed to unmarshal request:",err)
            cerr,jerr := json.Marshal(DataError{Error: err.Error()})
            if jerr != nil {
                log.Println("Failed to marshal DataError:",jerr)
                continue
            }
            if _,werr := conn.Write(cerr); werr != nil {
                log.Println("Failed to write to DataError:",werr)
                return
            }
            continue
        }

        result := Find(patientsDetail,req.Get)

        rsp,err := json.Marshal(&result)
        if err != nil {
            log.Println("Failed to marshal data:",err)
            if _,err := fmt.Fprintf(conn,`{"data_error":"internal error"}`); err != nil {
                log.Printf("Failed to write to client: %v",err)
                return
            }
            continue
        }
        if _,err := conn.Write(rsp); err != nil {
            log.Println("Failed to write response:",err)
            return
        }
    }
}

这可以正确加载csv并将其转换为JSON。但是,当我尝试使用NetCat命令运行查询时,它返回空的JSON元素。请指导我哪里出错了。

解决方法

猜你想要这个:

╭─root@DESKTOP-OCDRD7Q ~
╰─# nc localhost 4040
{"get": "Sindh"}
[{"Covid_Positive":"1","Coivd_Performed":"1","Covid_Date":"1","Covid_Discharged":"1","Covid_Expired":"1","Covid_Region":"Sindh","Covid_Admitted":"1"}]

您应该做的只是修改您的json请求。


package main

import (
    "bufio"
    "encoding/csv"
    "encoding/json"
    "flag"
    "fmt"
    "io"
    "log"
    "net"
    "os"
)

type CovidPatient struct {
    Positive   string `json:"Covid_Positive"`
    Performed  string `json:"Coivd_Performed"`
    Date       string `json:"Covid_Date"`
    Discharged string `json:"Covid_Discharged"`
    Expired    string `json:"Covid_Expired"`
    Region     string `json:"Covid_Region"`
    Admitted   string `json:"Covid_Admitted"`
}

type DataRequest struct {
    Get CovidPatient `json:"get"`
}

type DataError struct {
    Error string `json:"Covid_error"`
}

func Load(path string) []CovidPatient {
    table := make([]CovidPatient,0)
    var patient CovidPatient
    file,err := os.Open(path)
    if err != nil {
        panic(err.Error())
    }
    defer file.Close()

    reader := csv.NewReader(file)
    csvData,err := reader.ReadAll()
    if err != nil {
        fmt.Println(err)
        os.Exit(1)
    }
    for _,row := range csvData {
        patient.Positive = row[0]
        patient.Performed = row[1]
        patient.Date = row[2]
        patient.Discharged = row[3]
        patient.Expired = row[4]
        patient.Region = row[5]
        patient.Admitted = row[6]
        table = append(table,patient)
    }
    return table
}

func Find(table []CovidPatient,filter CovidPatient) []CovidPatient {

    result := make([]CovidPatient,0)

    log.Println(filter,table)

    for _,cp := range table {

        if filter.Positive == "" {
        } else if filter.Positive != cp.Positive {
            continue
        }
        if filter.Performed == "" {
        } else if filter.Performed != cp.Performed {
            continue
        }
        if filter.Date == "" {
        } else if filter.Date != cp.Date {
            continue
        }
        if filter.Discharged == "" {
        } else if filter.Discharged != cp.Discharged {
            continue
        }
        if filter.Expired == "" {
        } else if filter.Expired != cp.Expired {
            continue
        }
        if filter.Region == "" {
        } else if filter.Region != cp.Region {
            continue
        }
        if filter.Admitted == "" {
        } else if filter.Admitted != cp.Admitted {
            continue
        }

        result = append(result,cp)
    }
    return result

}

var (
    patientsDetail = Load("./covid_final_data.csv")
)

func main() {
    log.SetFlags(log.Lshortfile | log.Ltime)
    var addr string
    var network string
    flag.StringVar(&addr,"e",":4040","service endpoint [ip addr or socket path]")
    flag.StringVar(&network,"n","tcp","network protocol [tcp,unix]")
    flag.Parse()

    switch network {
    case "tcp","tcp4","tcp6","unix":
    default:
        fmt.Println("unsupported network protocol")
        os.Exit(1)
    }

    ln,err := net.Listen(network,addr)
    if err != nil {
        log.Println(err)
        os.Exit(1)
    }
    defer ln.Close()
    log.Println("Covid19 Condition in Pakistan")
    log.Printf("Service started: (%s) %s\n",network,addr)

    for {
        conn,err := ln.Accept()
        if err != nil {
            log.Println(err)
            conn.Close()
            continue
        }
        log.Println("Connected to ",conn.RemoteAddr())
        go handleConnection(conn)
    }
}
func handleConnection(conn net.Conn) {
    defer func() {
        if err := conn.Close(); err != nil {
            log.Println("error closing connection:",err)
        }
    }()

    reader := bufio.NewReaderSize(conn,100)

    for {
        buf,err := reader.ReadBytes('|')
        if err != nil {
            if err != io.EOF {
                log.Println("connection read error:",err)
                return
            }
        }
        reader.Reset(conn)

        var req DataRequest
        if err := json.Unmarshal(buf[:len(buf)-1],&req); err != nil {
            log.Println("failed to unmarshal request:",string(buf),err)
            cerr,jerr := json.Marshal(DataError{Error: err.Error()})
            if jerr != nil {
                log.Println("failed to marshal DataError:",jerr)
                continue
            }
            if _,werr := conn.Write(cerr); werr != nil {
                log.Println("failed to write to DataError:",werr)
                return
            }
            continue
        }

        result := Find(patientsDetail,req.Get)

        rsp,err := json.Marshal(&result)
        if err != nil {
            log.Println("failed to marshal data:",err)
            if _,err := fmt.Fprintf(conn,`{"data_error":"internal error"}`); err != nil {
                log.Printf("failed to write to client: %v",err)
                return
            }
            continue
        }
        if _,err := conn.Write(rsp); err != nil {
            log.Println("failed to write response:",err)
            return
        }
    }
}

查询是:

╭─root@DESKTOP-OCDRD7Q ~
╰─# nc localhost 4040                                                                                             127 ↵
{
    "get": {
        "Covid_Region": "Sindh","Covid_Date": "2020-03-20"
    }
}|
[{"Covid_Positive":"1","Covid_Date":"2020-03-20","Covid_Admitted":"1"}]
,

在函数handleConnection中,第一件事是“读取直到找到第一个}”,想象用户正在发送请求:

{ "get": { "Covid_Region": "Sindh","Covid_Date": "2020-03-20" } }

然后该步骤读取:

{ "get": { "Covid_Region": "Sindh","Covid_Date": "2020-03-20" }

请注意,结尾的}丢失了,然后json.Unmarshal试图取消对没有最后一个}(这是无效的json)的查询的封送。

此问题可以利用JSON流解码,换句话说,使用json.NewDecoder(r io.Reader)代替json.Unmarshal。让我复制并修改该函数的第一部分:

func handleConnection(conn net.Conn) {
    defer func() {
        if err := conn.Close(); err != nil {
            log.Println("error closing connection:",err)
        }
    }()

    jsonDecoder := json.NewDecoder(conn) // A json decoder read a stream to find a
                                         // valid JSON and stop just the byte
                                         // after the JSON ends. Process can be
                                         // repeated.

    for {
        var req DataRequest
        err := jsonDecoder.Decode(&req)
        if err == io.EOF {
            log.Println("finish")
            return
        }
        if err != nil {
            log.Println("unmarshal:",err)
            return
        }

        result := Find(patientsDetail,req.Get) // Here query the system

        // ... 

现在可能可以了,但是您也可以利用json流在de for循环之前以jsonEncoder := json.NewEncoder(conn)的形式发送响应并发送请求,如下所示:

        err := jsonEncoder.Encode(&result)
        if err != nil {
            log.Println("failed to marshal data:",err)
            // ...
            continue
        }