वे कुंजी Process.monitor(pid)
बुला का एक संयोजन था - तो आप handle_info
पर कॉल प्राप्त होगा - और मैन्युअल रूप से बुला Supervisor.start_child
जो आप पीआईडी देता है।
मैंने पहले handle_info
का उपयोग करने की कोशिश की थी लेकिन इसे कभी भी कॉल नहीं किया जा सका। Process.monitor(pid)
को उसी प्रक्रिया से बुलाया जाना चाहिए जिसे आप अधिसूचनाएं प्राप्त करना चाहते हैं, इसलिए आपको इसे मॉनिटर को अपनी सर्वर प्रक्रिया से जोड़ने के लिए handle_call
फ़ंक्शन के अंदर से कॉल करना होगा। कोड को दूसरी प्रक्रिया के रूप में चलाने के लिए एक फ़ंक्शन हो सकता है (यानी run_from_process(job_queue_pid, fn -> Process.monitor(pid_to_monitor) end)
) लेकिन मुझे कुछ भी नहीं मिला।
संलग्न नौकरी कतार का एक बहुत ही निष्क्रिय कार्यान्वयन है। मैं केवल एलीक्सिर में एक दिन हूं इसलिए कोड गन्दा और गैर-मूर्खतापूर्ण दोनों है लेकिन मैं इसे जोड़ रहा हूं क्योंकि इस विषय के आसपास उदाहरण कोड की कमी है।
HeavyIndustry.JobQueue
, handle_info
, create_new_worker
पर देखें। इस कोड के साथ एक स्पष्ट मुद्दा है: यह क्रैश होने पर श्रमिकों को पुनरारंभ करने में सक्षम है, लेकिन यह उस कोड से अगले नौकरी पर कतार शुरू करने में सक्षम नहीं है (GenServer.call
की आवश्यकता के कारण handle_info
, जो हमें डेडलॉक्स करता है)। मुझे लगता है कि आप प्रक्रिया को अलग करके इसे ठीक कर सकते हैं जो नौकरियों को ट्रैक करने वाली प्रक्रिया से नौकरियां शुरू करता है। यदि आप उदाहरण कोड चलाते हैं तो आप देखेंगे कि आखिरकार यह कतार में चल रहा है, भले ही कतार में कोई भी है (:crash
नौकरी)।
defmodule HeavyIndustry.Supervisor do
use Supervisor
def start_link do
Supervisor.start_link(__MODULE__, :ok)
end
def init(:ok) do
# default to supervising nothing, we will add
supervise([], strategy: :one_for_one)
end
def create_children(supervisor, worker_count) do
# create the job queue. defaults to no workers
Supervisor.start_child(supervisor, worker(HeavyIndustry.JobQueue, [[supervisor, worker_count]]))
end
end
defmodule HeavyIndustry.JobQueue do
use GenServer
@job_queue_name __MODULE__
def start_link(args, _) do
GenServer.start_link(__MODULE__, args, name: @job_queue_name)
end
def init([supervisor, n]) do
# set some default state
state = %{
supervisor: supervisor,
max_workers: n,
jobs: [],
workers: %{
idle: [],
busy: []
}
}
{:ok, state}
end
def setup() do
# we want to be aware of worker failures. we hook into this by calling
# Process.monitor(pid), but this links the calling process with the monitored
# process. To make sure the calls come to US and not the process that called
# setup, we create the workers by passing a message to our server process
state = GenServer.call(@job_queue_name, :setup)
# gross passing the whole state back here to monitor but the monitoring must
# be started from the server process and we can't call GenServer.call from
# inside the :setup call else we deadlock.
workers = state.workers.idle
GenServer.call(@job_queue_name, {:monitor_pids, workers})
end
def add_job(from, job) do
# add job to queue
{:ok, our_job_id} = GenServer.call(@job_queue_name, {:create_job, %{job: job, reply_to: from}})
# try to run the next job
case GenServer.call(@job_queue_name, :start_next_job) do
# started our job
{:ok, started_job_id = ^our_job_id} -> {:ok, :started}
# started *a* job
{:ok, _} -> {:ok, :pending}
# couldnt start any job but its ok...
{:error, :no_idle_workers} -> {:ok, :pending}
# something fell over...
{:error, e} -> {:error, e}
# yeah I know this is bad.
_ -> {:ok}
end
end
def start_next_job do
GenServer.call(@job_queue_name, :start_next_job)
end
##
# Internal API
##
def handle_call(:setup, _, state) do
workers = Enum.map(0..(state.max_workers-1), fn (n) ->
{:ok, pid} = start_new_worker(state.supervisor)
pid
end)
state = %{state | workers: %{state.workers | idle: workers}}
{:reply, state, state}
end
defp start_new_worker(supervisor) do
spec = Supervisor.Spec.worker(HeavyIndustry.Worker, [], id: :"Worker.#{:os.system_time}", restart: :temporary)
# start worker
Supervisor.start_child(supervisor, spec)
end
def handle_call({:monitor_pids, list}, _, state) do
Enum.each(list, &Process.monitor(&1))
{:reply, :ok, state}
end
def handle_call({:create_job, job}, from, state) do
job = %{
job: job.job,
reply_to: job.reply_to,
id: :os.system_time, # id for task
status: :pending, # start pending, go active, then remove
pid: nil
}
# add new job to jobs list
state = %{state | jobs: state.jobs ++ [job]}
{:reply, {:ok, job.id}, state}
end
def handle_call(:start_next_job, _, state) do
IO.puts "==> Start Next Job"
IO.inspect state
IO.puts "=================="
reply = case {find_idle_worker(state.workers), find_next_job(state.jobs)} do
{{:error, :no_idle_workers}, _} ->
# no workers for job, doesnt matter if we have a job
{:error, :no_idle_workers}
{_, nil} ->
# no job, doesnt matter if we have a worker
{:error, :no_more_jobs}
{{:ok, worker}, job} ->
# have worker, have job, do work
# update state to set job active and worker busy
jobs = state.jobs -- [job]
job = %{job | status: :active, pid: worker}
jobs = jobs ++ [job]
idle = state.workers.idle -- [worker]
busy = state.workers.busy ++ [worker]
state = %{state | jobs: jobs, workers: %{idle: idle, busy: busy}}
{:ok, task_id} = Task.start(fn ->
result = GenServer.call(worker, job.job)
remove_job(job)
free_worker(worker)
send job.reply_to, %{answer: result, job: job.job}
start_next_job
end)
{:ok, job.id}
end
{:reply, reply, state}
end
defp find_idle_worker(workers) do
case workers do
%{idle: [], busy: _} -> {:error, :no_idle_workers}
%{idle: [worker | idle], busy: busy} -> {:ok, worker}
end
end
defp find_next_job(jobs) do
jobs |> Enum.find(&(&1.status == :pending))
end
defp free_worker(worker) do
GenServer.call(@job_queue_name, {:free_worker, worker})
end
defp remove_job(job) do
GenServer.call(@job_queue_name, {:remove_job, job})
end
def handle_call({:free_worker, worker}, from, state) do
idle = state.workers.idle ++ [worker]
busy = state.workers.busy -- [worker]
{:reply, :ok, %{state | workers: %{idle: idle, busy: busy}}}
end
def handle_call({:remove_job, job}, from, state) do
jobs = state.jobs -- [job]
{:reply, :ok, %{state | jobs: jobs}}
end
def handle_info(msg = {reason, ref, :process, pid, _reason}, state) do
IO.puts "Worker collapsed: #{reason} #{inspect pid}, clear and restart job"
# find job for collapsed worker
# set job to pending again
job = Enum.find(state.jobs, &(&1.pid == pid))
fixed_job = %{job | status: :pending, pid: nil}
jobs = (state.jobs -- [job]) ++ [fixed_job]
# remote worker from lists
idle = state.workers.idle -- [pid]
busy = state.workers.busy -- [pid]
# start new worker
{:ok, pid} = start_new_worker(state.supervisor)
# add worker from lists
idle = state.workers.idle ++ [pid]
# cant call GenServer.call from here to monitor pid,
# so duplicate the code a bit...
Process.monitor(pid)
# update state
state = %{state | jobs: jobs, workers: %{idle: idle, busy: busy}}
{:noreply, state}
end
end
defmodule HeavyIndustry.Worker do
use GenServer
def start_link do
GenServer.start_link(__MODULE__, :ok)
end
def init(:ok) do
# workers have no persistent state
IO.puts "==> Worker up! #{inspect self}"
{:ok, nil}
end
def handle_call({:sum, list}, from, _) do
sum = Enum.reduce(list, fn (n, acc) -> acc + n end)
{:reply, sum, nil}
end
def handle_call({:fib, n}, from, _) do
sum = fib_calc(n)
{:reply, sum, nil}
end
def handle_call({:stop}, from, state) do
{:stop, "my-stop-reason", "my-stop-reply", state}
end
def handle_call({:crash}, from, _) do
{:reply, "this will crash" ++ 1234, nil}
end
def handle_call({:timeout}, from, _) do
:timer.sleep 10000
{:reply, "this will timeout", nil}
end
# Slow fib
defp fib_calc(0), do: 0
defp fib_calc(1), do: 1
defp fib_calc(n), do: fib_calc(n-1) + fib_calc(n-2)
end
defmodule Looper do
def start do
{:ok, pid} = HeavyIndustry.Supervisor.start_link
{:ok, job_queue} = HeavyIndustry.Supervisor.create_children(pid, 2)
HeavyIndustry.JobQueue.setup()
add_jobs
loop
end
def add_jobs do
jobs = [
{:sum, [100, 200, 300]},
{:crash},
{:fib, 35},
{:fib, 35},
{:sum, [88, 88, 99]},
{:fib, 35},
{:fib, 35},
{:fib, 35},
{:sum, 0..100},
# {:stop}, # stop not really a failure
{:sum, [88, 88, 99]},
# {:timeout},
{:sum, [-1]}
]
Enum.each(jobs, fn (job) ->
IO.puts "~~~~> Add job: #{inspect job}"
case HeavyIndustry.JobQueue.add_job(self, job) do
{:ok, :started} -> IO.puts "~~~~> Started job immediately"
{:ok, :pending} -> IO.puts "~~~~> Job in queue"
val -> IO.puts "~~~~> ... val: #{inspect val}"
end
end)
end
def loop do
receive do
value ->
IO.puts "~~~~> Received: #{inspect value}"
loop
end
end
end
Looper.start