Supervisors and gen_servers

This commit is contained in:
Fabio Salvini 2017-06-11 18:47:34 +02:00
parent f0403bfa26
commit ae72db2769
7 changed files with 127 additions and 98 deletions

View File

@ -1,18 +1,25 @@
-module(coordinator).
-behaviour(supervisor).
-compile(export_all).
start() ->
_MailerPid = mailer:start(),
{ok, Files} = application:get_env(log_monitor, logfiles),
Monitors = [monitor:start(File) || File <- Files],
loop(Monitors).
start_link() ->
supervisor:start_link(?MODULE, []).
loop(Monitors) ->
receive
{'EXIT', FromPid, Reason} ->
io:format("Monitor process terminated: ~s~n", [Reason]),
RemainingMonitors = [M || M <- Monitors, M =/= FromPid],
loop(RemainingMonitors)
after 5000 ->
loop(Monitors)
end.
init([]) ->
SupFlags = #{strategy => one_for_one},
ChildSpecs = [#{
id => mailer,
start => {mailer, start_link, []},
restart => permanent,
shutdown => 5000
}] ++ monitors_child_specs(),
{ok, {SupFlags, ChildSpecs}}.
monitors_child_specs() ->
{ok, Files} = application:get_env(log_monitor, logfiles),
[#{
id => list_to_atom(File),
start => {monitor, start_link, [File]},
restart => temporary,
shutdown => 5000
} || File <- Files].

View File

@ -0,0 +1,49 @@
-module(gatherer).
-behaviour(gen_server).
-compile(export_all).
start_link(File) ->
gen_server:start_link(?MODULE, [File], []).
init([File]) ->
{ok, [off, File]}.
handle_info({log_line, Text}, [off, File]) ->
case isError(Text) of
true ->
{ok, Timer} = timer:send_after(1000, {timeout}),
{noreply, [on, File, Text, Timer]};
false -> {noreply, [off, File]}
end;
handle_info({log_line, Text}, [on, File, Error, Timer]) ->
case isError(Text) of
true ->
timer:clean(Timer),
{ok, NewTimer} = timer:send_after(1000, {timeout}),
{noreply, [on, File, Error ++ "\n" ++ Text, NewTimer]};
false ->
{noreply, [on, File, Error ++ "\n" ++ Text, Timer]}
end;
handle_info({timeout}, [on, File, Error, _Timer]) ->
mailer ! {error, File, Error},
{noreply, [off, File]}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_call(_Request, _From, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
shutdown.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
isError(Text) ->
case re:run(Text, "ERROR") of %% TODO
{match, _} ->
true;
nomatch ->
false
end.

View File

@ -29,8 +29,8 @@ start_link() ->
%% Child :: {Id,StartFunc,Restart,Shutdown,Type,Modules}
init([]) ->
{ok, { {one_for_all, 0, 1}, [{console,
{coordinator, start, []},
permanent, 5000, worker, [monitor]}]} }.
{coordinator, start_link, []},
permanent, 5000, supervisor, [coordinator]}]} }.
%%====================================================================
%% Internal functions

View File

@ -1,25 +1,33 @@
-module(mailer).
-behaviour(gen_server).
-compile(export_all).
start() ->
io:format("Starting Mailer~n", []),
Pid = spawn_link(?MODULE, init, []),
register(mailer, Pid),
Pid.
start_link() ->
gen_server:start_link(?MODULE, [], []).
init() ->
loop().
init([]) ->
register(mailer, self()),
{ok, []}.
loop() ->
receive
{error, File, Text} ->
case send_email(File, Text) of
{error, Reason, Message} ->
io:format("Error sending email: ~s ~p~n", [Reason, Message]);
_ -> io:format("Mail sent~n", [])
end,
loop()
end.
handle_info({error, File, Text}, []) ->
case send_email(File, Text) of
{error, Reason, Message} ->
io:format("Error sending email: ~s ~p~n", [Reason, Message]);
_ -> io:format("Mail sent~n", [])
end,
{noreply, []}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_call(_Request, _From, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
shutdown.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
send_email(File, Text) ->
{ok, EmailConfig} = application:get_env(log_monitor, email_config),

View File

@ -1,59 +1,22 @@
-module(monitor).
-behaviour(supervisor).
-compile(export_all).
start(File) ->
io:format("Starting monitor for file: ~s~n", [File]),
Pid = spawn_link(?MODULE, init, [File]),
Pid.
start_link(File) ->
supervisor:start_link(?MODULE, [File]).
init(File) ->
process_flag(trap_exit, true),
startWatcher(File),
loop(File).
loop(File) ->
receive
{log_line, Text} ->
case isError(Text) of
true -> errorGathering(File, Text);
false -> loop(File)
end,
io:format("Received line: ~s~n", [Text]),
loop(File);
{'EXIT', _FromPid, Reason} ->
io:format("Watcher process terminated: ~s~n", [Reason]),
startWatcher(File),
loop(File)
end.
errorGathering(File, Error) ->
{ok, Timer} = timer:send_after(1000, {timeout}),
errorGathering(File, Error, Timer).
errorGathering(File, Error, Timer) ->
receive
{log_line, Text} ->
case isError(Text) of
true ->
timer:clean(Timer),
{ok, NewTimer} = timer:send_after(1000, {timeout}),
errorGathering(File, Error ++ "\n" ++ Text, NewTimer);
false ->
errorGathering(File, Error ++ "\n" ++ Text, Timer)
end;
{timeout} ->
mailer ! {error, File, Error},
loop(File)
end.
isError(Text) ->
case re:run(Text, "ERROR") of %% TODO
{match, _} ->
true;
nomatch ->
false
end.
startWatcher(File) ->
%% WatcherPid = spawn_link(watcher, init, [self(), File]),
WatcherPid = watcher:start_link(self(), File),
{ok, WatcherPid}.
SupFlags = #{strategy => one_for_one},
ChildSpecs = [#{
id => gatherer,
start => {gatherer, start_link, [File]},
restart => permanent,
shutdown => 5000
},
#{
id => watcher,
start => {watcher, start_link, [self(), File]},
restart => permanent,
shutdown => 5000
}],
{ok, {SupFlags, ChildSpecs}}.

View File

@ -2,20 +2,20 @@
-behaviour(gen_server).
-compile(export_all).
start_link(MonitorPid, File) ->
gen_server:start_link(?MODULE, [MonitorPid, File], []).
start_link(SupPid, File) ->
gen_server:start_link(?MODULE, [SupPid, File], []).
init([MonitorPid, File]) ->
io:format("Init~n"),
init([SupPid, File]) ->
Cmd = "/usr/bin/tail -n0 --follow=name " ++ File,
Port = open_port({spawn, Cmd}, [stderr_to_stdout, exit_status, binary]),
{ok, [MonitorPid, Port]}.
{ok, [SupPid, Port]}.
handle_info(Msg, [MonitorPid, Port]) ->
handle_info(Msg, [SupPid, Port]) ->
case Msg of
{Port, {data, Text}} ->
MonitorPid ! {log_line, Text},
{noreply, [MonitorPid, Port]};
GathererPid = gatherer_pid(SupPid),
GathererPid ! {log_line, Text},
{noreply, [SupPid, Port]};
{Port, {exit_status, _Status}} ->
io:format("Watcher terminated~n"),
{stop, tail_exit, []}
@ -27,9 +27,11 @@ handle_cast(_Msg, State) ->
handle_call(_Request, _From, State) ->
{noreply, State}.
terminate(Reason, _State) ->
io:format("Reason: ~s~n", [Reason]),
terminate(_Reason, _State) ->
shutdown.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
gatherer_pid(SupPid) ->
hd([Pid || {Id, Pid, _, _} <- supervisor:which_children(SupPid), Id == gatherer]).

View File

@ -1,7 +1,7 @@
[
{ log_monitor,
[
{logfiles, ["/tmp/lines.log"]},
{logfiles, ["/tmp/lines.log", "/tmp/lines2.log"]},
{email_config,
[
{sender, "me@example.com"},