我如何在 C# 和 CouchDB 中观察新数据?

问题描述

我创建了一个函数,它从数据库(CouchDB)中获取新到达的数据,然后利用这些数据进行分析。一切正常,但现在我正在努力如何在没有更多数据的情况下保持该功能的活动。

我现在得到了什么:

public async Task<FsResponseModel.Root> AnalyzeImage()
    {
        HttpClient client = new HttpClient();
        client.DefaultRequestHeaders.Add("Accept","application/json");
        client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic",Convert.ToBase64String(System.Text.ASCIIEncoding.ASCII.GetBytes($"user:pw" + $"")));

        HttpResponseMessage httpResponse = await client.GetAsync("CouchDBUrl/_changes?Feed=continuous&filter=_view&view=list/images&include_docs=true&attachments=true");
        httpResponse.EnsureSuccessstatusCode();
        string json = await httpResponse.Content.ReadAsstringAsync();

        //Somewhere to keep the parsed results
        var results = new List<ViewResponseModel>();
        Root root = null;

        var serializer = new JsonSerializer();

        using (var stringReader = new StringReader(json))
        using (var jsonReader = new JsonTextReader(stringReader))
        {
            //Make sure the JsonReader kNows we have multiple documents
            jsonReader.SupportMultipleContent = true;

            while (jsonReader.Read())
            {
                //Read in the next document
                var nextObject = JObject.ReadFrom(jsonReader);

                //Determine if we are on the last item or not
                if (nextObject["last_seq"] != null)
                {
                    root = nextObject.ToObject<Root>();
                }
                else
                {
                    results.Add(nextObject.ToObject<ViewResponseModel>());
                }
            }
        }

        if (results.Count >= 0)
        {
            while (results.Count >= 0)
            {
                string base64Image = results[0].doc._attachments.image.data;
                string refNr = results[0].doc.creator;
                string token = "myToken";

                var httpWebRequest = (HttpWebRequest)WebRequest.Create("MyApi");
                httpWebRequest.PreAuthenticate = true;
                httpWebRequest.Headers.Add("Authorization","Bearer " + token);
                httpWebRequest.ContentType = "application/json";
                httpWebRequest.Accept = "application/json";
                httpWebRequest.Method = "POST";

                var obj = new FsRequestModel() { };
                obj.Image = base64Image;
                obj.Application = "MyApp";
                obj.RefNr = "hzd." + refNr + results[0].doc._id;

                string imageJson = JsonConvert.SerializeObject(obj);

                using (var streamWriter = new StreamWriter(httpWebRequest.GetRequestStream()))
                {
                    streamWriter.Write(imageJson);
                }

                var httpResponseApi = (HttpWebResponse)httpWebRequest.GetResponse();
                var result = "";
                using (var streamReader = new StreamReader(httpResponseApi.GetResponseStream()))
                {
                    result = streamReader.ReadToEnd();
                }
                FsResponseModel.Root response = JsonConvert.DeserializeObject<FsResponseModel.Root>(result);

                return response;
            }
        }
        else
        {
            //w8 for new data
            return null;
        }

        return null;

    }

我需要一个解决方案来告诉代码它必须等待新数据,如果新数据可用,则重新开始该过程。

编辑:

我现在正在尝试处理传入的提要。任何想法如何消费这个?

 public async Task ObserveDbAndTrigger()
    {
        var url = "http://localhost:5984/mydb/_changes?Feed=continuous&filter=_view&view=MyView&include_docs=true&attachments=true&heartbeat=1000";

        using (var client = new HttpClient())
        {
            client.Timeout = TimeSpan.FromMilliseconds(Timeout.Infinite);
            client.DefaultRequestHeaders.Add("Accept","application/json");
            client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic",Convert.ToBase64String(System.Text.ASCIIEncoding.ASCII.GetBytes($"user:pw" + $"")));

            string line = "";
            var request = new HttpRequestMessage(HttpMethod.Get,url);
            using (var response = await client.SendAsync(
                request,HttpCompletionoption.ResponseHeadersRead))
            {
                using (var body = await response.Content.ReadAsstreamAsync())
                using (var reader = new StreamReader(body))
                    while (!reader.EndOfStream)
                        line = reader.ReadLine();

                if (line != "")
                {
                   //get data and work with it
                }
            }
        }
    }

我有流,但我不知道如何处理传入的线路。 任何帮助都会很棒。

编辑 2:

Stream 无法工作,我已经对其进行了测试,但它无法识别新行...

我的下一个想法是尝试使用事件源而不是连续提要。

对这个想法有什么建议吗?

解决方法

我像这样改变了它,现在我可以处理数据了。

static void Main()
        {
            string db = "myDb";
            string auth = "Basic " + Convert.ToBase64String(ASCIIEncoding.ASCII.GetBytes(String.Join(":","user","password")));
            string query = db + "/_changes?feed=eventsource&include_docs=true&attachments=true&since=0&heartbeat=5000";

            ChangesPublisher pub = new ChangesPublisher();

            pub.OnChange += (sender,e) => Console.WriteLine(e.Value);
            pub.OnException += (sender,e) => Console.WriteLine(e.Value.ToString() + "\r\n\r\nPress a key to exit.");
            // start publishing.
            Task.Run(async () =>
            {
                await pub.Begin("localhost",5984,query,auth);
            });
            //Stop running
            Console.ReadKey();

            pub.Stop = true;

        }
        public class ChangesPublisher
        {
            public bool Stop { get; set; }
            public class ChangeEvent : EventArgs
            {
                public string Value { get; set; }
                public ChangeEvent(string value)
                {
                    Value = value;
                }
            }

            public class ExceptionEvent : EventArgs
            {
                public Exception Value { get; set; }
                public ExceptionEvent(Exception value)
                {
                    Value = value;
                }
            }

            public event EventHandler<ChangeEvent> OnChange = delegate { };
            public event EventHandler<ExceptionEvent> OnException = delegate { };

            public async Task Begin(string serverAddr,int port,string query,string auth)
            {
                using (var client = new TcpClient())
                {
                    string request = String.Join("\r\n",new List<string> {
                    String.Format("GET /{0} HTTP/1.1",query),"Authorization: " + auth,"Accept: application/json","Host: " + serverAddr,"Connection: keep-alive","\r\n"
                });

                    try
                    {
                        await client.ConnectAsync(serverAddr,port);
                        using (NetworkStream stream = client.GetStream())
                        {
                            StreamWriter writer = new StreamWriter(stream);
                            await writer.WriteAsync(request);
                            await writer.FlushAsync();

                            StreamReader reader = new StreamReader(stream);
                            while (!Stop)
                            {
                                string data = await reader.ReadLineAsync();
                                OnChange(this,new ChangeEvent(data));                                        

                                //work with data

                            }
                        }
                    }
                    catch (Exception e)
                    {
                        OnException(this,new ExceptionEvent(e));
                    }
                }
            }
        }