2015-10-17 5 views
6

मैं शैक्षिक अभ्यास के रूप में एलिक्सीर में नौकरी कतार का निर्माण कर रहा हूं। वर्तमान में मेरे कर्मचारियों को कतार के साथ स्वयं को मैन्युअल रूप से पंजीकृत करना होगा (MyQuestion.Worker.start_link देखें)।एलिक्सीर/ओटीपी पर्यवेक्षक का पता लगाने के बच्चे के स्पॉन और समाप्ति

मैं चाहता हूं कि मेरे पर्यवेक्षक कतार के साथ उपलब्ध श्रमिकों को पंजीकृत/पुनरारंभ करें जब ऐसा लगता है कि यह श्रमिकों की जांच में सहायता करेगा और युग्मन को कम करेगा।

क्या MyQuestion.Supervisor में नीचे दिए गए कोड में मैंने जो कुछ बताया है, उसे करने का कोई तरीका है?

defmodule MyQuestion.Supervisor do 
    use Supervisor 

    def start_link do 
    supervisor = Supervisor.start_link(__MODULE__, :ok) 
    end 

    def init(:ok) do 
    children = [ 
     worker(MyQuestion.JobQueue, []), 
     worker(MyQuestion.Worker, [], id: :worker_0), 
     worker(MyQuestion.Worker, [], id: :worker_1)] 
    supervise(children, strategy: :rest_for_one) 
    end 

    # LOOKING FOR SOMETHING LIKE THIS 
    # on worker spawn, I want to add the worker to the queue 
    def child_spawned(pid, {MyQuestion.Worker, _, _}) do 
    # add worker to queue 
    MyQuestion.JobQueue.add_new_worker(pid) 
    end 

    # LOOKING FOR SOMETHING LIKE THIS 
    # I want some way to do the following (imagine the callback existed) 
    def child_terminated(pid, reason, state) 
    # with this information I could tell the job queue to mark 
    # the job associated with the pid as failed and to retry 
    # or maybe extract the job id from the worker state, etc. 
    MyQuestion.JobQueue.remove_worker(pid) 
    MyQuestion.JobQueue.restart_job_for_failed_worker(pid) 
    end 

end 

defmodule MyQuestion.JobQueue do 
    def start_link do 
    Agent.start_link(fn -> [] end, name: __MODULE__) 
    end 

    def new_worker(pid) do 
    # register pid with agent state in available worker list, etc. 
    end 

    def add_job(job_description) do 
    # find idle worker and run job 
    <... snip ...> 
    end 

    <... snip ...> 
end 

defmodule MyQuestion.Worker do 
    use GenServer 
    def start_link do 
    # start worker 
    {:ok, worker} = GenServer.start_link(__MODULE__, []) 

    # Now we have a worker pid, so we can register that pid with the queue 
    # I wish this could be in the supervisor or else where. 
    MyQuestion.JobQueue.add_new_worker(worker) 

    # must return gen server's start link 
    {:ok, worker} 
    end 

    <... snip ...> 
end 

उत्तर

1

वे कुंजी Process.monitor(pid) बुला का एक संयोजन था - तो आप handle_info पर कॉल प्राप्त होगा - और मैन्युअल रूप से बुला Supervisor.start_child जो आप पीआईडी ​​देता है।

मैंने पहले handle_info का उपयोग करने की कोशिश की थी लेकिन इसे कभी भी कॉल नहीं किया जा सका। Process.monitor(pid) को उसी प्रक्रिया से बुलाया जाना चाहिए जिसे आप अधिसूचनाएं प्राप्त करना चाहते हैं, इसलिए आपको इसे मॉनिटर को अपनी सर्वर प्रक्रिया से जोड़ने के लिए handle_call फ़ंक्शन के अंदर से कॉल करना होगा। कोड को दूसरी प्रक्रिया के रूप में चलाने के लिए एक फ़ंक्शन हो सकता है (यानी run_from_process(job_queue_pid, fn -> Process.monitor(pid_to_monitor) end)) लेकिन मुझे कुछ भी नहीं मिला।

संलग्न नौकरी कतार का एक बहुत ही निष्क्रिय कार्यान्वयन है। मैं केवल एलीक्सिर में एक दिन हूं इसलिए कोड गन्दा और गैर-मूर्खतापूर्ण दोनों है लेकिन मैं इसे जोड़ रहा हूं क्योंकि इस विषय के आसपास उदाहरण कोड की कमी है।

HeavyIndustry.JobQueue, handle_info, create_new_worker पर देखें। इस कोड के साथ एक स्पष्ट मुद्दा है: यह क्रैश होने पर श्रमिकों को पुनरारंभ करने में सक्षम है, लेकिन यह उस कोड से अगले नौकरी पर कतार शुरू करने में सक्षम नहीं है (GenServer.call की आवश्यकता के कारण handle_info, जो हमें डेडलॉक्स करता है)। मुझे लगता है कि आप प्रक्रिया को अलग करके इसे ठीक कर सकते हैं जो नौकरियों को ट्रैक करने वाली प्रक्रिया से नौकरियां शुरू करता है। यदि आप उदाहरण कोड चलाते हैं तो आप देखेंगे कि आखिरकार यह कतार में चल रहा है, भले ही कतार में कोई भी है (:crash नौकरी)।

defmodule HeavyIndustry.Supervisor do 
    use Supervisor 

    def start_link do 
    Supervisor.start_link(__MODULE__, :ok) 
    end 

    def init(:ok) do 
    # default to supervising nothing, we will add 
    supervise([], strategy: :one_for_one) 
    end 

    def create_children(supervisor, worker_count) do 
    # create the job queue. defaults to no workers 
    Supervisor.start_child(supervisor, worker(HeavyIndustry.JobQueue, [[supervisor, worker_count]])) 
    end 
end 

defmodule HeavyIndustry.JobQueue do 
    use GenServer 

    @job_queue_name __MODULE__ 

    def start_link(args, _) do 
    GenServer.start_link(__MODULE__, args, name: @job_queue_name) 
    end 

    def init([supervisor, n]) do 
    # set some default state 
    state = %{ 
     supervisor: supervisor, 
     max_workers: n, 
     jobs: [], 
     workers: %{ 
     idle: [], 
     busy: [] 
     } 
    } 
    {:ok, state} 
    end 

    def setup() do 
    # we want to be aware of worker failures. we hook into this by calling 
    # Process.monitor(pid), but this links the calling process with the monitored 
    # process. To make sure the calls come to US and not the process that called 
    # setup, we create the workers by passing a message to our server process 
    state = GenServer.call(@job_queue_name, :setup) 

    # gross passing the whole state back here to monitor but the monitoring must 
    # be started from the server process and we can't call GenServer.call from 
    # inside the :setup call else we deadlock. 
    workers = state.workers.idle 
    GenServer.call(@job_queue_name, {:monitor_pids, workers}) 
    end 

    def add_job(from, job) do 
    # add job to queue 
    {:ok, our_job_id} = GenServer.call(@job_queue_name, {:create_job, %{job: job, reply_to: from}}) 

    # try to run the next job 
    case GenServer.call(@job_queue_name, :start_next_job) do 
     # started our job 
     {:ok, started_job_id = ^our_job_id} -> {:ok, :started} 
     # started *a* job 
     {:ok, _} -> {:ok, :pending} 
     # couldnt start any job but its ok... 
     {:error, :no_idle_workers} -> {:ok, :pending} 
     # something fell over... 
     {:error, e} -> {:error, e} 
     # yeah I know this is bad. 
     _ -> {:ok} 
    end 
    end 

    def start_next_job do 
    GenServer.call(@job_queue_name, :start_next_job) 
    end 

    ## 
    # Internal API 
    ## 

    def handle_call(:setup, _, state) do 
    workers = Enum.map(0..(state.max_workers-1), fn (n) -> 
     {:ok, pid} = start_new_worker(state.supervisor) 
     pid 
    end) 
    state = %{state | workers: %{state.workers | idle: workers}} 
    {:reply, state, state} 
    end 

    defp start_new_worker(supervisor) do 
    spec = Supervisor.Spec.worker(HeavyIndustry.Worker, [], id: :"Worker.#{:os.system_time}", restart: :temporary) 
    # start worker 
    Supervisor.start_child(supervisor, spec) 
    end 

    def handle_call({:monitor_pids, list}, _, state) do 
    Enum.each(list, &Process.monitor(&1)) 
    {:reply, :ok, state} 
    end 

    def handle_call({:create_job, job}, from, state) do 
    job = %{ 
     job: job.job, 
     reply_to: job.reply_to, 
     id: :os.system_time, # id for task 
     status: :pending, # start pending, go active, then remove 
     pid: nil 
    } 
    # add new job to jobs list 
    state = %{state | jobs: state.jobs ++ [job]} 
    {:reply, {:ok, job.id}, state} 
    end 

    def handle_call(:start_next_job, _, state) do 
    IO.puts "==> Start Next Job" 
    IO.inspect state 
    IO.puts "==================" 

    reply = case {find_idle_worker(state.workers), find_next_job(state.jobs)} do 
     {{:error, :no_idle_workers}, _} -> 
     # no workers for job, doesnt matter if we have a job 
     {:error, :no_idle_workers} 

     {_, nil} -> 
     # no job, doesnt matter if we have a worker 
     {:error, :no_more_jobs} 

     {{:ok, worker}, job} -> 
     # have worker, have job, do work 

     # update state to set job active and worker busy 
     jobs = state.jobs -- [job] 
     job = %{job | status: :active, pid: worker} 
     jobs = jobs ++ [job] 

     idle = state.workers.idle -- [worker] 
     busy = state.workers.busy ++ [worker] 

     state = %{state | jobs: jobs, workers: %{idle: idle, busy: busy}} 

     {:ok, task_id} = Task.start(fn -> 
      result = GenServer.call(worker, job.job) 

      remove_job(job) 
      free_worker(worker) 

      send job.reply_to, %{answer: result, job: job.job} 

      start_next_job 
     end) 
     {:ok, job.id} 
    end 

    {:reply, reply, state} 
    end 

    defp find_idle_worker(workers) do 
    case workers do 
     %{idle: [], busy: _} -> {:error, :no_idle_workers} 
     %{idle: [worker | idle], busy: busy} -> {:ok, worker} 
    end 
    end 

    defp find_next_job(jobs) do 
    jobs |> Enum.find(&(&1.status == :pending)) 
    end 

    defp free_worker(worker) do 
    GenServer.call(@job_queue_name, {:free_worker, worker}) 
    end 
    defp remove_job(job) do 
    GenServer.call(@job_queue_name, {:remove_job, job}) 
    end 

    def handle_call({:free_worker, worker}, from, state) do 
    idle = state.workers.idle ++ [worker] 
    busy = state.workers.busy -- [worker] 
    {:reply, :ok, %{state | workers: %{idle: idle, busy: busy}}} 
    end 

    def handle_call({:remove_job, job}, from, state) do 
    jobs = state.jobs -- [job] 
    {:reply, :ok, %{state | jobs: jobs}} 
    end 

    def handle_info(msg = {reason, ref, :process, pid, _reason}, state) do 
    IO.puts "Worker collapsed: #{reason} #{inspect pid}, clear and restart job" 

    # find job for collapsed worker 
    # set job to pending again 
    job = Enum.find(state.jobs, &(&1.pid == pid)) 
    fixed_job = %{job | status: :pending, pid: nil} 
    jobs = (state.jobs -- [job]) ++ [fixed_job] 

    # remote worker from lists 
    idle = state.workers.idle -- [pid] 
    busy = state.workers.busy -- [pid] 

    # start new worker 
    {:ok, pid} = start_new_worker(state.supervisor) 

    # add worker from lists 
    idle = state.workers.idle ++ [pid] 

    # cant call GenServer.call from here to monitor pid, 
    # so duplicate the code a bit... 
    Process.monitor(pid) 

    # update state 
    state = %{state | jobs: jobs, workers: %{idle: idle, busy: busy}} 

    {:noreply, state} 
    end 
end 

defmodule HeavyIndustry.Worker do 
    use GenServer 

    def start_link do 
    GenServer.start_link(__MODULE__, :ok) 
    end 

    def init(:ok) do 
    # workers have no persistent state 
    IO.puts "==> Worker up! #{inspect self}" 
    {:ok, nil} 
    end 

    def handle_call({:sum, list}, from, _) do 
    sum = Enum.reduce(list, fn (n, acc) -> acc + n end) 
    {:reply, sum, nil} 
    end 

    def handle_call({:fib, n}, from, _) do 
    sum = fib_calc(n) 
    {:reply, sum, nil} 
    end 

    def handle_call({:stop}, from, state) do 
    {:stop, "my-stop-reason", "my-stop-reply", state} 
    end 

    def handle_call({:crash}, from, _) do 
    {:reply, "this will crash" ++ 1234, nil} 
    end 

    def handle_call({:timeout}, from, _) do 
    :timer.sleep 10000 
    {:reply, "this will timeout", nil} 
    end 

    # Slow fib 
    defp fib_calc(0), do: 0 
    defp fib_calc(1), do: 1 
    defp fib_calc(n), do: fib_calc(n-1) + fib_calc(n-2) 

end 

defmodule Looper do 
    def start do 
    {:ok, pid} = HeavyIndustry.Supervisor.start_link 
    {:ok, job_queue} = HeavyIndustry.Supervisor.create_children(pid, 2) 
    HeavyIndustry.JobQueue.setup() 
    add_jobs 
    loop 
    end 

    def add_jobs do 
    jobs = [ 
     {:sum, [100, 200, 300]}, 
     {:crash}, 
     {:fib, 35}, 
     {:fib, 35}, 
     {:sum, [88, 88, 99]}, 
     {:fib, 35}, 
     {:fib, 35}, 
     {:fib, 35}, 
     {:sum, 0..100}, 
     # {:stop}, # stop not really a failure 

     {:sum, [88, 88, 99]}, 
     # {:timeout}, 
     {:sum, [-1]} 
    ] 
    Enum.each(jobs, fn (job) -> 
     IO.puts "~~~~> Add job: #{inspect job}" 
     case HeavyIndustry.JobQueue.add_job(self, job) do 
     {:ok, :started} -> IO.puts "~~~~> Started job immediately" 
     {:ok, :pending} -> IO.puts "~~~~> Job in queue" 
     val -> IO.puts "~~~~> ... val: #{inspect val}" 
     end 
    end) 
    end 

    def loop do 
    receive do 
     value -> 
     IO.puts "~~~~> Received: #{inspect value}" 
     loop 
    end 
    end 
end 

Looper.start 
संबंधित मुद्दे