带有计时器的观察者模式

问题描述

| 我已经为我的应用程序使用了观察者模式。 我有一个主题,其中有一个名为\'tmr \'的System.Timers.Timer对象。每60秒触发一次此计时器的滴答事件。在此滴答事件中,我将通知与我的主题相关的所有观察者。我使用了for循环遍历我的观察者列表,然后触发观察者更新方法。 假设我有10位观察员与我的学科相关。 每个观察者需要10秒钟才能完成其处理。 现在,在for循环中完成通知会导致在90秒后调用最后一个Observer的Update方法。即只有在前一个观察者更新方法完成处理后,才能调用一个观察者更新方法。 但这不是我在应用程序中想要的。我需要在计时器滴答发生时立即触发所有观察者的Update方法。这样观察员就不必等待。我希望这可以通过线程来完成。 因此,我将代码修改
// Fires the updates instantly
    public void Notify()
    {
      foreach (Observer o in _observers)
      {
        Threading.Thread oThread = new Threading.Thread(o.Update);
        oThread.Name = o.GetType().Name;
        oThread.Start();
      }
    }
但是我有两个疑问 如果有10个观察者 我的计时器间隔是60秒 然后,语句new Thread()将触发600次。 建议在每个计时器刻度上创建新线程是否有效? 如果我的观察者花太多时间来完成其更新逻辑,即超过60秒,该怎么办?表示计时器滴答发生在更新观察者之前。我该如何控制? 我可以发布示例代码..如果需要... 我使用的代码
using System;
using System.Collections.Generic;
using System.Timers;
using System.Text;
using Threading = System.Threading;
using System.ComponentModel;

namespace singletimers
{
  class Program
  {


    static void Main(string[] args)
    {
      DataPullerSubject.Instance.Attach(Observer1.Instance);
      DataPullerSubject.Instance.Attach(Observer2.Instance);
      Console.ReadKey();
    }
  }

  public sealed class DataPullerSubject
  {
    private static volatile DataPullerSubject instance;
    private static object syncRoot = new Object();
    public static DataPullerSubject Instance
    {
      get
      {
        if (instance == null)
        {
          lock (syncRoot)
          {
            if (instance == null)
              instance = new DataPullerSubject();
          }
        }

        return instance;
      }
    }

    int interval = 10 * 1000;
    Timer tmr;
    private List<Observer> _observers = new List<Observer>();

    DataPullerSubject()
    {
      tmr = new Timer();
      tmr.Interval = 1; // first time to call instantly
      tmr.Elapsed += new ElapsedEventHandler(tmr_Elapsed);
      tmr.Start();
    }

    public void Attach(Observer observer)
    {
      _observers.Add(observer);
    }

    public void Detach(Observer observer)
    {
      _observers.Remove(observer);
    }

    // Fires the updates instantly
    public void Notify()
    {
      foreach (Observer o in _observers)
      {
        Threading.Thread oThread = new Threading.Thread(o.Update);
        oThread.Name = o.GetType().Name;
        oThread.Start();
      }
    }

    private void tmr_Elapsed(object source,ElapsedEventArgs e)
    {
      tmr.Interval = interval;
      tmr.Stop(); // stop the timer until all notification triggered
      this.Notify();
      tmr.Start();//start again
    }
  }


  public abstract class Observer
  {
    string data;
    public abstract void Update();
    public virtual void GetDataFromDBAndSetToDataSet(string param)
    {
      Console.WriteLine(\"Processing for: \" + param);
      data = param + new Random().Next(1,2000);
      Threading.Thread.Sleep(10 * 1000);//long work
      Console.WriteLine(\"Data set for: \" + param);
    }
  }


  public sealed class Observer1 : Observer
  {
    private static volatile Observer1 instance;
    private static object syncRoot = new Object();
    public static Observer1 Instance
    {
      get
      {
        if (instance == null)
        {
          lock (syncRoot)
          {
            if (instance == null)
              instance = new Observer1();
          }
        }

        return instance;
      }
    }
    Observer1()
    {
    }
    public override void Update()
    {
      base.GetDataFromDBAndSetToDataSet(\"Observer1\");
    }

  }

  public sealed class Observer2 : Observer
  {
    private static volatile Observer2 instance;
    private static object syncRoot = new Object();
    public static Observer2 Instance
    {
      get
      {
        if (instance == null)
        {
          lock (syncRoot)
          {
            if (instance == null)
              instance = new Observer2();
          }
        }

        return instance;
      }
    }
    Observer2()
    {
    }
    public override void Update()
    {
      base.GetDataFromDBAndSetToDataSet(\"Observer2\");
    }

  }
}
谢谢。     

解决方法

不推荐使用
new Thread
。使用
Task
Task<T>
创建Observable模式框架的最佳尝试可能只会接近Rx。使用可以解决您提到的问题的方法(即,如果处理花费太多时间)。 Rx将为您提供巨大的灵活性来定义可观察的场景。     ,1)您可以通过ThreadPool.QueueUserWorkItem使用ThreadPool中的线程,也可以使用Tasks 2)您必须同步您的方法。     ,或者,观察者可以以非阻塞方式实现Update。 也就是说,更新总是立即返回。然后,如有必要,观察者对象有责任在新线程中执行其工作。 我不确定这对您的情况是否有帮助-我不知道您的“观察者”是什么,但是也许您也不知道?