+
Skip to content

Resilient verk improvements #180

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jan 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions lib/verk/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ defmodule Verk.Manager do
@doc false
def init(queues) do
ets = :ets.new(@table, @ets_options)
local_verk_node_id = Application.fetch_env!(:verk, :local_node_id)

for {queue, size} <- queues do
:ets.insert_new(@table, {queue, size, :running})
Verk.Node.add_queue!(local_verk_node_id, queue, Verk.Redis)
end

{:ok, ets}
Expand Down Expand Up @@ -75,8 +73,6 @@ defmodule Verk.Manager do
Logger.error("Queue #{queue} is already running")
end

local_verk_node_id = Application.fetch_env!(:verk, :local_node_id)
Verk.Node.add_queue!(local_verk_node_id, queue, Verk.Redis)
Verk.Manager.Supervisor.start_child(queue, size)
end

Expand All @@ -87,8 +83,6 @@ defmodule Verk.Manager do
@spec remove(atom) :: :ok | {:error, :not_found}
def remove(queue) do
:ets.delete(@table, queue)
local_verk_node_id = Application.fetch_env!(:verk, :local_node_id)
Verk.Node.remove_queue!(local_verk_node_id, queue, Verk.Redis)
Verk.Manager.Supervisor.stop_child(queue)
end
end
49 changes: 26 additions & 23 deletions lib/verk/node.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,6 @@ defmodule Verk.Node do

@verk_nodes_key "verk_nodes"

@spec register(String.t(), non_neg_integer, GenServer.t()) ::
:ok | {:error, :verk_node_id_already_running}
def register(verk_node_id, ttl, redis) do
case Redix.pipeline!(redis, [
["SADD", @verk_nodes_key, verk_node_id],
["PSETEX", "verk:node:#{verk_node_id}", ttl, "alive"]
]) do
[1, _] -> :ok
_ -> {:error, :node_id_already_running}
end
end

@spec deregister!(String.t(), GenServer.t()) :: :ok
def deregister!(verk_node_id, redis) do
Redix.pipeline!(redis, [
Expand All @@ -29,11 +17,12 @@ defmodule Verk.Node do
end

@spec members(integer, non_neg_integer, GenServer.t()) ::
{:ok, [String.t()]} | {:more, [String.t()], integer}
{:ok, [String.t()]} | {:more, [String.t()], integer} | {:error, term}
def members(cursor \\ 0, count \\ 25, redis) do
case Redix.command!(redis, ["SSCAN", @verk_nodes_key, cursor, "COUNT", count]) do
["0", verk_nodes] -> {:ok, verk_nodes}
[cursor, verk_nodes] -> {:more, verk_nodes, cursor}
case Redix.command(redis, ["SSCAN", @verk_nodes_key, cursor, "COUNT", count]) do
{:ok, ["0", verk_nodes]} -> {:ok, verk_nodes}
{:ok, [cursor, verk_nodes]} -> {:more, verk_nodes, cursor}
{:error, reason} -> {:error, reason}
end
end

Expand All @@ -42,9 +31,9 @@ defmodule Verk.Node do
Redix.command!(redis, ["PTTL", verk_node_key(verk_node_id)])
end

@spec expire_in!(String.t(), integer, GenServer.t()) :: integer
def expire_in!(verk_node_id, ttl, redis) do
Redix.command!(redis, ["PSETEX", verk_node_key(verk_node_id), ttl, "alive"])
@spec expire_in(String.t(), integer, GenServer.t()) :: {:ok, integer} | {:error, term}
def expire_in(verk_node_id, ttl, redis) do
Redix.command(redis, ["PSETEX", verk_node_key(verk_node_id), ttl, "alive"])
end

@spec queues!(String.t(), integer, non_neg_integer, GenServer.t()) ::
Expand All @@ -62,12 +51,26 @@ defmodule Verk.Node do
end
end

def add_queue!(verk_node_id, queue, redis) do
Redix.command!(redis, ["SADD", verk_node_queues_key(verk_node_id), queue])
@doc """
Redis command to add a queue to the set of queues that a node is processing

iex> Verk.Node.add_queue_redis_command("123", "default")
["SADD", "verk:node:123:queues", "default"]
"""
@spec add_queue_redis_command(String.t(), String.t()) :: [String.t()]
def add_queue_redis_command(verk_node_id, queue) do
["SADD", verk_node_queues_key(verk_node_id), queue]
end

def remove_queue!(verk_node_id, queue, redis) do
Redix.command!(redis, ["SREM", verk_node_queues_key(verk_node_id), queue])
@doc """
Redis command to add a queue to the set of queues that a node is processing

iex> Verk.Node.add_node_redis_command("123")
["SADD", "verk_nodes", "123"]
"""
@spec add_node_redis_command(String.t()) :: [String.t()]
def add_node_redis_command(verk_node_id) do
["SADD", @verk_nodes_key, verk_node_id]
end

defp verk_node_key(verk_node_id), do: "verk:node:#{verk_node_id}"
Expand Down
42 changes: 29 additions & 13 deletions lib/verk/node/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ defmodule Verk.Node.Manager do
"Node Manager started for node #{local_verk_node_id}. Heartbeat will run every #{frequency} milliseconds"
)

:ok = Verk.Node.register(local_verk_node_id, 2 * frequency, Verk.Redis)
heartbeat(local_verk_node_id, frequency)

Process.send_after(self(), :heartbeat, frequency)
Process.flag(:trap_exit, true)
Verk.Scripts.load(Verk.Redis)
Expand All @@ -28,20 +29,37 @@ defmodule Verk.Node.Manager do

@doc false
def handle_info(:heartbeat, state = {local_verk_node_id, frequency}) do
faulty_nodes = find_faulty_nodes(local_verk_node_id)
heartbeat(local_verk_node_id, frequency)

for faulty_verk_node_id <- faulty_nodes do
Logger.warn("Verk Node #{faulty_verk_node_id} seems to be down. Restoring jobs!")
with {:ok, faulty_nodes} <- find_faulty_nodes(local_verk_node_id) do
for faulty_verk_node_id <- faulty_nodes do
Logger.warn("Verk Node #{faulty_verk_node_id} seems to be down. Restoring jobs!")

cleanup_queues(faulty_verk_node_id)
cleanup_queues(faulty_verk_node_id)

Verk.Node.deregister!(faulty_verk_node_id, Verk.Redis)
Verk.Node.deregister!(faulty_verk_node_id, Verk.Redis)
end
else
{:error, reason} ->
Logger.error("Failed while looking for faulty nodes. Reason: #{inspect(reason)}")
end

heartbeat!(local_verk_node_id, frequency)
Process.send_after(self(), :heartbeat, frequency)
{:noreply, state}
end

defp heartbeat(local_verk_node_id, frequency) do
case Verk.Node.expire_in(local_verk_node_id, 2 * frequency, Verk.Redis) do
{:ok, _} ->
:ok

{:error, reason} ->
Logger.error(
"Failed to heartbeat node '#{local_verk_node_id}'. Reason: #{inspect(reason)}"
)
end
end

def terminate(reason = {:shutdown, _}, {local_verk_node_id, _}) do
do_terminate(reason, local_verk_node_id)
end
Expand Down Expand Up @@ -82,11 +100,14 @@ defmodule Verk.Node.Manager do
defp find_faulty_nodes(local_verk_node_id, cursor \\ 0) do
case Verk.Node.members(cursor, Verk.Redis) do
{:ok, verk_nodes} ->
do_find_faulty_nodes(verk_nodes, local_verk_node_id)
{:ok, do_find_faulty_nodes(verk_nodes, local_verk_node_id)}

{:more, verk_nodes, cursor} ->
do_find_faulty_nodes(verk_nodes, local_verk_node_id) ++
find_faulty_nodes(local_verk_node_id, cursor)

{:error, reason} ->
{:error, reason}
end
end

Expand All @@ -96,11 +117,6 @@ defmodule Verk.Node.Manager do
end)
end

defp heartbeat!(local_verk_node_id, frequency) do
Verk.Node.expire_in!(local_verk_node_id, 2 * frequency, Verk.Redis)
Process.send_after(self(), :heartbeat, frequency)
end

defp enqueue_inprogress(node_id, queue) do
case InProgressQueue.enqueue_in_progress(queue, node_id, Verk.Redis) do
{:ok, [0, m]} ->
Expand Down
57 changes: 43 additions & 14 deletions lib/verk/queue_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ defmodule Verk.QueueManager do

defmodule State do
@moduledoc false
defstruct [:queue_name, :redis, :node_id]
defstruct [:queue_name, :redis, :node_id, :track_node_id]
end

@doc """
Expand Down Expand Up @@ -80,7 +80,14 @@ defmodule Verk.QueueManager do
{:ok, redis} = Redix.start_link(Confex.get_env(:verk, :redis_url))
Verk.Scripts.load(redis)

state = %State{queue_name: queue_name, redis: redis, node_id: node_id}
track_node_id = Application.get_env(:verk, :generate_node_id, false)

state = %State{
queue_name: queue_name,
redis: redis,
node_id: node_id,
track_node_id: track_node_id
}

Logger.info("Queue Manager started for queue #{queue_name}")
{:ok, state}
Expand Down Expand Up @@ -118,18 +125,8 @@ defmodule Verk.QueueManager do
end
end

def handle_call({:dequeue, n}, _from, state) do
case Redix.command(state.redis, [
"EVALSHA",
@mrpop_lpush_src_dest_script_sha,
2,
"queue:#{state.queue_name}",
inprogress(state.queue_name, state.node_id),
min(@max_jobs, n)
]) do
{:ok, []} ->
{:reply, [], state}

def handle_call({:dequeue, n}, _from, state = %State{track_node_id: false}) do
case Redix.command(state.redis, mrpop_lpush_src_dest(state.node_id, state.queue_name, n)) do
{:ok, jobs} ->
{:reply, jobs, state}

Expand All @@ -142,6 +139,27 @@ defmodule Verk.QueueManager do
end
end

def handle_call({:dequeue, n}, _from, state = %State{track_node_id: true}) do
case Redix.pipeline(state.redis, [
["MULTI"],
Verk.Node.add_node_redis_command(state.node_id),
Verk.Node.add_queue_redis_command(state.node_id, state.queue_name),
mrpop_lpush_src_dest(state.node_id, state.queue_name, n),
["EXEC"]
]) do
{:ok, response} ->
jobs = response |> List.last() |> List.last()
{:reply, jobs, state}

{:error, %Redix.Error{message: message}} ->
Logger.error("Failed to fetch jobs: #{message}")
{:stop, :redis_failed, :redis_failed, state}

{:error, _} ->
{:reply, :redis_failed, state}
end
end

def handle_call({:retry, job, failed_at, exception, stacktrace}, _from, state) do
retry_count = (job.retry_count || 0) + 1
job = build_retry_job(job, retry_count, failed_at, exception, stacktrace)
Expand Down Expand Up @@ -215,4 +233,15 @@ defmodule Verk.QueueManager do
end

defp format_stacktrace(stacktrace), do: inspect(stacktrace)

defp mrpop_lpush_src_dest(node_id, queue_name, n) do
[
"EVALSHA",
@mrpop_lpush_src_dest_script_sha,
2,
"queue:#{queue_name}",
inprogress(queue_name, node_id),
min(@max_jobs, n)
]
end
end
32 changes: 29 additions & 3 deletions test/integration/test/integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ defmodule IntegrationTest do
end
end

setup_all do
setup do
Application.put_env(:verk, :generate_node_id, false, persistent: true)
Application.delete_env(:verk, :local_node_id, persistent: true)
{:ok, redis} = Redix.start_link(Confex.get_env(:verk, :redis_url))
Redix.command!(redis, ["FLUSHDB"])
{:ok, redis: redis}
end

@tag integration: true
test "shutdown", %{redis: redis} do
defp enqueue_jobs!(redis) do
for _x <- 0..10 do
Verk.enqueue(
%Verk.Job{queue: "queue_one", class: Integration.SleepWorker, args: [1_500]},
Expand All @@ -37,9 +38,34 @@ defmodule IntegrationTest do
redis
)
end
end

@tag integration: true
test "shutdown gracefully stops queues", %{redis: redis} do
enqueue_jobs!(redis)

Application.ensure_all_started(:integration)
{:ok, _consumer} = Consumer.start()
assert Redix.command!(redis, ["SMEMBERS", "verk_nodes"]) == []
Application.stop(:integration)

assert_receive %Verk.Events.QueuePausing{queue: :queue_one}
assert_receive %Verk.Events.QueuePausing{queue: :queue_two}
assert_receive %Verk.Events.QueuePaused{queue: :queue_one}
assert_receive %Verk.Events.QueuePaused{queue: :queue_two}
end

@tag integration: true
test "generate_node_id true maintains verk_nodes", %{redis: redis} do
enqueue_jobs!(redis)

Application.put_env(:verk, :generate_node_id, true, persistent: true)
Application.ensure_all_started(:integration)
{:ok, _consumer} = Consumer.start()
node_id = Application.fetch_env!(:verk, :local_node_id)
assert Redix.command!(redis, ["SMEMBERS", "verk_nodes"]) == [node_id]
assert Redix.command!(redis, ["TTL", "verk:node:#{node_id}"]) > 0

Application.stop(:integration)

assert_receive %Verk.Events.QueuePausing{queue: :queue_one}
Expand Down
5 changes: 0 additions & 5 deletions test/manager_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ defmodule Verk.ManagerTest do
describe "init/1" do
test "creates an ETS table with queues" do
queues = [default: 25, low_priority: 10]
expect(Verk.Node, :add_queue!, [@node_id, :default, Verk.Redis], :ok)
expect(Verk.Node, :add_queue!, [@node_id, :low_priority, Verk.Redis], :ok)
init(queues)

assert :ets.tab2list(:verk_manager) == [
Expand Down Expand Up @@ -128,7 +126,6 @@ defmodule Verk.ManagerTest do
init_table([])

expect(Verk.Manager.Supervisor, :start_child, [:default, 25], {:ok, :child})
expect(Verk.Node, :add_queue!, [@node_id, :default, Verk.Redis], :ok)

assert add(:default, 25) == {:ok, :child}
assert :ets.tab2list(:verk_manager) == [{:default, 25, :running}]
Expand All @@ -142,7 +139,6 @@ defmodule Verk.ManagerTest do
init_table(queues)

expect(Verk.Manager.Supervisor, :stop_child, [:default], :ok)
expect(Verk.Node, :remove_queue!, [@node_id, :default, Verk.Redis], :ok)

assert remove(:default) == :ok
assert :ets.tab2list(:verk_manager) == [{:low_priority, 10, :running}]
Expand All @@ -154,7 +150,6 @@ defmodule Verk.ManagerTest do
init_table(queues)

expect(Verk.Manager.Supervisor, :stop_child, [:default], {:error, :not_found})
expect(Verk.Node, :remove_queue!, [@node_id, :default, Verk.Redis], :ok)

assert remove(:default) == {:error, :not_found}
assert validate(Verk.Manager.Supervisor)
Expand Down
Loading
点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载