2012-09-20 13 views
21

से कनेक्ट हो रहा है, इसलिए मेरे पास एक विंडोज़ सेवा प्रक्रिया है जो वर्कफ़्लो प्रक्रिया करता है। बैक एंड एडीएमएक्स से उत्पन्न इकाइयों के वर्ग के साथ इकाई फ्रेमवर्क के शीर्ष पर रिपोजिटरी और यूनिटफवर्क पैटर्न और एकता का उपयोग करता है। मैं पूरी तरह से विस्तार से नहीं जाऊंगा क्योंकि यह जरूरी नहीं है लेकिन मूल रूप से वर्कफ़्लो के माध्यम से 5 कदम हैं। एक विशेष प्रक्रिया किसी भी समय किसी भी समय (पाठ्यक्रम के क्रम में) हो सकती है। चरण एक केवल चरण दो के लिए डेटा उत्पन्न करता है, जो किसी अन्य सर्वर पर लंबे समय तक चलने वाली प्रक्रिया के माध्यम से डेटा को मान्य करता है। फिर वहां उस डेटा के साथ एक पीडीएफ उत्पन्न करता है। प्रत्येक चरण के लिए हम एक टाइमर उत्पन्न करते हैं, हालांकि यह प्रत्येक चरण के लिए एक से अधिक टाइमर बनने की अनुमति देने के लिए कॉन्फ़िगर करने योग्य है। इसमें समस्या आती है। जब मैं किसी विशेष चरण में प्रोसेसर जोड़ता हूं, तो यह निम्न त्रुटि को यादृच्छिक रूप से:मल्टीथ्रेडिंग इकाई फ्रेमवर्क: कनेक्शन बंद नहीं था। कनेक्शन का वर्तमान स्थिति

कनेक्शन बंद नहीं था। कनेक्शन की वर्तमान स्थिति कनेक्ट हो रही है।

इस पर पढ़ना यह स्पष्ट प्रतीत होता है कि ऐसा इसलिए हो रहा है क्योंकि संदर्भ दो धागे से उसी इकाई तक पहुंचने का प्रयास कर रहा है। लेकिन यह वह जगह है जहां यह मुझे एक पाश के लिए फेंकने की तरह है। इस राज्य पर मुझे मिली सारी जानकारी जो हमें प्रति थ्रेड के उदाहरण संदर्भ का उपयोग करनी चाहिए। जहां तक ​​मैं कह सकता हूं कि मैं कर रहा हूं (नीचे दिए गए कोड को देखें)। मैं सिंगलटन पैटर्न या स्टेटिक्स या कुछ भी नहीं उपयोग कर रहा हूं इसलिए मुझे सच में यकीन नहीं है कि यह क्यों हो रहा है या इससे कैसे बचें। मैंने आपकी समीक्षा के लिए नीचे दिए गए कोड के प्रासंगिक बिट्स पोस्ट किए हैं।

आधार भंडार:

public class BaseRepository 
{ 
    /// <summary> 
    /// Initializes a repository and registers with a <see cref="IUnitOfWork"/> 
    /// </summary> 
    /// <param name="unitOfWork"></param> 
    public BaseRepository(IUnitOfWork unitOfWork) 
    { 
     if (unitOfWork == null) throw new ArgumentException("unitofWork"); 
     UnitOfWork = unitOfWork; 
    } 


    /// <summary> 
    /// Returns a <see cref="DbSet"/> of entities. 
    /// </summary> 
    /// <typeparam name="TEntity">Entity type the dbset needs to return.</typeparam> 
    /// <returns></returns> 
    protected virtual DbSet<TEntity> GetDbSet<TEntity>() where TEntity : class 
    { 

     return Context.Set<TEntity>(); 
    } 

    /// <summary> 
    /// Sets the state of an entity. 
    /// </summary> 
    /// <param name="entity">object to set state.</param> 
    /// <param name="entityState"><see cref="EntityState"/></param> 
    protected virtual void SetEntityState(object entity, EntityState entityState) 
    { 
     Context.Entry(entity).State = entityState; 
    } 

    /// <summary> 
    /// Unit of work controlling this repository.  
    /// </summary> 
    protected IUnitOfWork UnitOfWork { get; set; } 

    /// <summary> 
    /// 
    /// </summary> 
    /// <param name="entity"></param> 
    protected virtual void Attach(object entity) 
    { 
     if (Context.Entry(entity).State == EntityState.Detached) 
      Context.Entry(entity).State = EntityState.Modified; 
    } 

    protected virtual void Detach(object entity) 
    { 
     Context.Entry(entity).State = EntityState.Detached; 
    } 

    /// <summary> 
    /// Provides access to the ef context we are working with 
    /// </summary> 
    internal StatementAutoEntities Context 
    { 
     get 
     {     
      return (StatementAutoEntities)UnitOfWork; 
     } 
    } 
} 

StatementAutoEntities स्वत: जनरेट की एफई वर्ग है।

भंडार कार्यान्वयन:

public class ProcessingQueueRepository : BaseRepository, IProcessingQueueRepository 
{ 

    /// <summary> 
    /// Creates a new repository and associated with a <see cref="IUnitOfWork"/> 
    /// </summary> 
    /// <param name="unitOfWork"></param> 
    public ProcessingQueueRepository(IUnitOfWork unitOfWork) : base(unitOfWork) 
    { 
    } 

    /// <summary> 
    /// Create a new <see cref="ProcessingQueue"/> entry in database 
    /// </summary> 
    /// <param name="Queue"> 
    ///  <see cref="ProcessingQueue"/> 
    /// </param> 
    public void Create(ProcessingQueue Queue) 
    { 
     GetDbSet<ProcessingQueue>().Add(Queue); 
     UnitOfWork.SaveChanges(); 
    } 

    /// <summary> 
    /// Updates a <see cref="ProcessingQueue"/> entry in database 
    /// </summary> 
    /// <param name="queue"> 
    ///  <see cref="ProcessingQueue"/> 
    /// </param> 
    public void Update(ProcessingQueue queue) 
    { 
     //Attach(queue); 
     UnitOfWork.SaveChanges(); 
    } 

    /// <summary> 
    /// Delete a <see cref="ProcessingQueue"/> entry in database 
    /// </summary> 
    /// <param name="Queue"> 
    ///  <see cref="ProcessingQueue"/> 
    /// </param> 
    public void Delete(ProcessingQueue Queue) 
    { 
     GetDbSet<ProcessingQueue>().Remove(Queue); 
     UnitOfWork.SaveChanges(); 
    } 

    /// <summary> 
    /// Gets a <see cref="ProcessingQueue"/> by its unique Id 
    /// </summary> 
    /// <param name="id"></param> 
    /// <returns></returns> 
    public ProcessingQueue GetById(int id) 
    { 
     return (from e in Context.ProcessingQueue_SelectById(id) select e).FirstOrDefault(); 
    } 

    /// <summary> 
    /// Gets a list of <see cref="ProcessingQueue"/> entries by status 
    /// </summary> 
    /// <param name="status"></param> 
    /// <returns></returns> 
    public IList<ProcessingQueue> GetByStatus(int status) 
    { 
     return (from e in Context.ProcessingQueue_SelectByStatus(status) select e).ToList(); 
    } 

    /// <summary> 
    /// Gets a list of all <see cref="ProcessingQueue"/> entries 
    /// </summary> 
    /// <returns></returns> 
    public IList<ProcessingQueue> GetAll() 
    { 
     return (from e in Context.ProcessingQueue_Select() select e).ToList(); 
    } 

    /// <summary> 
    /// Gets the next pending item id in the queue for a specific work   
    /// </summary> 
    /// <param name="serverId">Unique id of the server that will process the item in the queue</param> 
    /// <param name="workTypeId">type of <see cref="WorkType"/> we are looking for</param> 
    /// <param name="operationId">if defined only operations of the type indicated are considered.</param> 
    /// <returns>Next pending item in the queue for the work type or null if no pending work is found</returns> 
    public int GetNextPendingItemId(int serverId, int workTypeId, int? operationId) 
    { 
     var id = Context.ProcessingQueue_GetNextPending(serverId, workTypeId, operationId).SingleOrDefault(); 
     return id.HasValue ? id.Value : -1; 
    } 

    /// <summary> 
    /// Returns a list of <see cref="ProcessingQueueStatus_dto"/>s objects with all 
    /// active entries in the queue 
    /// </summary> 
    /// <returns></returns> 
    public IList<ProcessingQueueStatus_dto> GetActiveStatusEntries() 
    { 
     return (from e in Context.ProcessingQueueStatus_Select() select e).ToList(); 
    } 
    /// <summary> 
    /// Bumps an entry to the front of the queue 
    /// </summary> 
    /// <param name="processingQueueId"></param> 
    public void Bump(int processingQueueId) 
    { 
     Context.ProcessingQueue_Bump(processingQueueId); 
    } 
} 

हम निर्भरता इंजेक्शन, उदाहरण के लिए कुछ बुला कोड के लिए एकता का उपयोग करें:

#region Members 
    private readonly IProcessingQueueRepository _queueRepository;  
    #endregion 

    #region Constructors 
    /// <summary>Initializes ProcessingQueue services with repositories</summary> 
    /// <param name="queueRepository"><see cref="IProcessingQueueRepository"/></param>   
    public ProcessingQueueService(IProcessingQueueRepository queueRepository) 
    { 
     Check.Require(queueRepository != null, "processingQueueRepository is required"); 
     _queueRepository = queueRepository; 

    } 
    #endregion 

खिड़कियों सेवा है कि टाइमर इस प्रकार है की शुरूआत में कोड:

  _staWorkTypeConfigLock.EnterReadLock(); 
     foreach (var timer in from operation in (from o in _staWorkTypeConfig.WorkOperations where o.UseQueueForExecution && o.AssignedProcessors > 0 select o) 
           let interval = operation.SpawnInternval < 30 ? 30 : operation.SpawnInternval 
           select new StaTimer 
          { 
           Interval = _runImmediate ? 5000 : interval*1000, 
           Operation = (ProcessingQueue.RequestedOperation) operation.OperationId 
          }) 
     { 
      timer.Elapsed += ApxQueueProcessingOnElapsedInterval; 
      timer.Enabled = true; 
      Logger.DebugFormat("Queue processing for operations of type {0} will execute every {1} seconds", timer.Operation, timer.Interval/1000);     
     } 
     _staWorkTypeConfigLock.ExitReadLock(); 

स्टाटाइमर टाइमर जोड़ने ऑपरेशन प्रकार पर केवल एक रैपर है। ApxQueueProcessingOnElapsedInterval तो मूल रूप से केवल ऑपरेशन के आधार पर प्रक्रिया को कार्य सौंपा जाता है।

मैं ApxQueueProcessingOnElapsedInterval कोड का थोड़ा सा जोड़ दूंगा जहां हम कार्यों को बढ़ा रहे हैं।

  _staTasksLock.EnterWriteLock(); 
     for (var x = 0; x < tasksNeeded; x++) 
     { 
      var t = new Task(obj => ProcessStaQueue((QueueProcessConfig) obj), 
          CreateQueueProcessConfig(true, operation), _cancellationToken); 


      _staTasks.Add(new Tuple<ProcessingQueue.RequestedOperation, DateTime, Task>(operation, DateTime.Now,t)); 

      t.Start(); 
      Thread.Sleep(300); //so there are less conflicts fighting for jobs in the queue table 
     } 
     _staTasksLock.ExitWriteLock(); 
+0

क्या आप आईओसी कंटेनर संदर्भ उदाहरणों का निपटान करते हैं? –

उत्तर

26

ऐसा लगता है कि आपकी सेवा, भंडार और संदर्भ आपके आवेदन के पूरे जीवनकाल के लिए जीने के लिए माना जाता है लेकिन यह गलत है। आप एक ही समय में कई टाइमर ट्रिगर कर सकते हैं। इसका अर्थ यह है कि एकाधिक थ्रेड समानांतर में आपकी सेवा का उपयोग करेंगे और वे आपके थ्रेड = संदर्भ में आपकी सेवा के कोड को निष्पादित करेंगे, एकाधिक थ्रेड => अपवाद के बीच साझा किया जाता है क्योंकि संदर्भ थ्रेड सुरक्षित नहीं है।

एकमात्र विकल्प प्रत्येक ऑपरेशन के लिए एक नया संदर्भ उदाहरण का उपयोग करना है जिसे आप निष्पादित करना चाहते हैं। उदाहरण के लिए आप संदर्भ के बजाय संदर्भ कारखाने को स्वीकार करने के लिए अपनी कक्षाओं को बदल सकते हैं और प्रत्येक ऑपरेशन के लिए एक नया संदर्भ प्राप्त कर सकते हैं।

मेरे मामले में, मैं यह सुनिश्चित किया है कि गैर धागा सुरक्षित DbContext एक (Ninject का उपयोग) TransientLifetime था, लेकिन यह अभी भी संगामिति मुद्दों पैदा कर रहा था:

+0

मैंने यह दिखाने के लिए कोड का एक छोटा सा जोड़ा कि सेवाओं को कैसे बनाया जा रहा है। मैं एक कार्य का उपयोग कर रहा हूं जो ProcessStaQueue को उत्पन्न करता है जो तब काम करता है कि इसे किस ऑपरेशन को चलाया जाना चाहिए। मुझे पता है कि संदर्भ थ्रेड सुरक्षित नहीं है लेकिन यह देखते हुए कि मैं स्टेटिक्स या सिंगलटन पैटर्न या कुछ भी नहीं उपयोग कर रहा हूं, क्या प्रत्येक कार्य को निष्पादित करने के लिए स्वतंत्र रूप से तत्काल तत्काल नहीं किया जाना चाहिए? – Brandon

+0

सेवा अंततः इस तरह तत्काल हो रही है: ServiceLocator.Current.GetInstance (); क्या इसका शायद इसके साथ कुछ करना है? मेरा अनुमान है कि हो सकता है कि हो सकता है कि सेवा लोकेटर के पास ऐसा कुछ हो। – Brandon

+0

लेकिन यह सेवा लोकेटर के कार्यान्वयन पर निर्भर है यदि यह हर बार जब आप इसके लिए पूछते हैं तो सेवा का एक नया उदाहरण बनाता है या यदि यह केवल पहले अनुरोध के लिए उदाहरण बनाता है और उदाहरण का पुन: उपयोग करने से पहले। –

3

मामले में यह किसी को भी मदद करता है! यह पता चला है कि मेरे कुछ कस्टम ActionFilters में मैंने कन्स्ट्रक्टर में DbContext तक पहुंच प्राप्त करने के लिए निर्भरता इंजेक्शन का उपयोग किया, लेकिन ActionFilters में एक जीवन भर है जो उन्हें कई अनुरोधों पर तत्काल रखता है, इसलिए संदर्भ पुनर्निर्मित नहीं हुआ।

मैंने इसे कन्स्ट्रक्टर के बजाय OnActionExecuting विधि में निर्भरता को मैन्युअल रूप से हल करके तय किया ताकि यह हर बार एक ताजा उदाहरण हो।

-1

मेरे मामले में मुझे यह समस्या मिल रही थी क्योंकि मैं अपने डीएएल फ़ंक्शन कॉल में से एक से पहले await कीवर्ड भूल गया था। await डालकर इसे हल किया गया।

+0

आवाइक एक थ्रेडिंग को बहुप्रचारित नहीं करता है – Merta

संबंधित मुद्दे