Merge 0.9->0.10
[prosody.git] / plugins / mod_s2s / s2sout.lib.lua
1 -- Prosody IM
2 -- Copyright (C) 2008-2010 Matthew Wild
3 -- Copyright (C) 2008-2010 Waqas Hussain
4 --
5 -- This project is MIT/X11 licensed. Please see the
6 -- COPYING file in the source package for more information.
7 --
8
9 --- Module containing all the logic for connecting to a remote server
10
11 local portmanager = require "core.portmanager";
12 local wrapclient = require "net.server".wrapclient;
13 local initialize_filters = require "util.filters".initialize;
14 local idna_to_ascii = require "util.encodings".idna.to_ascii;
15 local new_ip = require "util.ip".new_ip;
16 local rfc6724_dest = require "util.rfc6724".destination;
17 local socket = require "socket";
18 local adns = require "net.adns";
19 local dns = require "net.dns";
20 local t_insert, t_sort, ipairs = table.insert, table.sort, ipairs;
21
22 local s2s_destroy_session = require "core.s2smanager".destroy_session;
23
24 local log = module._log;
25
26 local anysource = { IPv4 = "0.0.0.0", IPv6 = "::" };
27 local function get_sources(addrs)
28         local sources = {};
29         for _, IP in ipairs(addrs) do
30                 local sock;
31                 if IP.proto == "IPv4" then
32                         sock = socket.udp();
33                 elseif IP.proto == "IPv6" then
34                         sock = socket.udp6();
35                 end
36                 sock:setpeername(IP.addr, 9);
37                 local localaddr = sock:getsockname() or anysource[IP.proto];
38                 sock:close();
39                 if not sources[localaddr] then
40                         sources[localaddr] = true;
41                         t_insert(sources, new_ip(localaddr, IP.proto));
42                 end
43         end
44         return sources;
45 end
46 local has_ipv4, has_ipv6;
47
48 local dns_timeout = module:get_option_number("dns_timeout", 15);
49 dns.settimeout(dns_timeout);
50
51 local s2sout = {};
52
53 local s2s_listener;
54
55
56 function s2sout.set_listener(listener)
57         s2s_listener = listener;
58 end
59
60 local function compare_srv_priorities(a,b)
61         return a.priority < b.priority or (a.priority == b.priority and a.weight > b.weight);
62 end
63
64 function s2sout.initiate_connection(host_session)
65         initialize_filters(host_session);
66         host_session.version = 1;
67
68         -- Kick the connection attempting machine into life
69         if not s2sout.attempt_connection(host_session) then
70                 -- Intentionally not returning here, the
71                 -- session is needed, connected or not
72                 s2s_destroy_session(host_session);
73         end
74
75         if not host_session.sends2s then
76                 -- A sends2s which buffers data (until the stream is opened)
77                 -- note that data in this buffer will be sent before the stream is authed
78                 -- and will not be ack'd in any way, successful or otherwise
79                 local buffer;
80                 function host_session.sends2s(data)
81                         if not buffer then
82                                 buffer = {};
83                                 host_session.send_buffer = buffer;
84                         end
85                         log("debug", "Buffering data on unconnected s2sout to %s", tostring(host_session.to_host));
86                         buffer[#buffer+1] = data;
87                         log("debug", "Buffered item %d: %s", #buffer, tostring(data));
88                 end
89         end
90 end
91
92 function s2sout.attempt_connection(host_session, err)
93         local to_host = host_session.to_host;
94         local connect_host, connect_port = to_host and idna_to_ascii(to_host), 5269;
95
96         if not connect_host then
97                 return false;
98         end
99
100         if not err then -- This is our first attempt
101                 log("debug", "First attempt to connect to %s, starting with SRV lookup...", to_host);
102                 host_session.connecting = true;
103                 local handle;
104                 handle = adns.lookup(function (answer)
105                         handle = nil;
106                         local srv_hosts = { answer = answer };
107                         host_session.srv_hosts = srv_hosts;
108                         host_session.srv_choice = 0;
109                         host_session.connecting = nil;
110                         if answer and #answer > 0 then
111                                 log("debug", "%s has SRV records, handling...", to_host);
112                                 for _, record in ipairs(answer) do
113                                         t_insert(srv_hosts, record.srv);
114                                 end
115                                 if #srv_hosts == 1 and srv_hosts[1].target == "." then
116                                         log("debug", "%s does not provide a XMPP service", to_host);
117                                         s2s_destroy_session(host_session, err); -- Nothing to see here
118                                         return;
119                                 end
120                                 t_sort(srv_hosts, compare_srv_priorities);
121
122                                 local srv_choice = srv_hosts[1];
123                                 host_session.srv_choice = 1;
124                                 if srv_choice then
125                                         connect_host, connect_port = srv_choice.target or to_host, srv_choice.port or connect_port;
126                                         log("debug", "Best record found, will connect to %s:%d", connect_host, connect_port);
127                                 end
128                         else
129                                 log("debug", "%s has no SRV records, falling back to A/AAAA", to_host);
130                         end
131                         -- Try with SRV, or just the plain hostname if no SRV
132                         local ok, err = s2sout.try_connect(host_session, connect_host, connect_port);
133                         if not ok then
134                                 if not s2sout.attempt_connection(host_session, err) then
135                                         -- No more attempts will be made
136                                         s2s_destroy_session(host_session, err);
137                                 end
138                         end
139                 end, "_xmpp-server._tcp."..connect_host..".", "SRV");
140
141                 return true; -- Attempt in progress
142         elseif host_session.ip_hosts then
143                 return s2sout.try_connect(host_session, connect_host, connect_port, err);
144         elseif host_session.srv_hosts and #host_session.srv_hosts > host_session.srv_choice then -- Not our first attempt, and we also have SRV
145                 host_session.srv_choice = host_session.srv_choice + 1;
146                 local srv_choice = host_session.srv_hosts[host_session.srv_choice];
147                 connect_host, connect_port = srv_choice.target or to_host, srv_choice.port or connect_port;
148                 host_session.log("info", "Connection failed (%s). Attempt #%d: This time to %s:%d", tostring(err), host_session.srv_choice, connect_host, connect_port);
149         else
150                 host_session.log("info", "Failed in all attempts to connect to %s", tostring(host_session.to_host));
151                 -- We're out of options
152                 return false;
153         end
154
155         if not (connect_host and connect_port) then
156                 -- Likely we couldn't resolve DNS
157                 log("warn", "Hmm, we're without a host (%s) and port (%s) to connect to for %s, giving up :(", tostring(connect_host), tostring(connect_port), tostring(to_host));
158                 return false;
159         end
160
161         return s2sout.try_connect(host_session, connect_host, connect_port);
162 end
163
164 function s2sout.try_next_ip(host_session)
165         host_session.connecting = nil;
166         host_session.ip_choice = host_session.ip_choice + 1;
167         local ip = host_session.ip_hosts[host_session.ip_choice];
168         local ok, err= s2sout.make_connect(host_session, ip.ip, ip.port);
169         if not ok then
170                 if not s2sout.attempt_connection(host_session, err or "closed") then
171                         err = err and (": "..err) or "";
172                         s2s_destroy_session(host_session, "Connection failed"..err);
173                 end
174         end
175 end
176
177 function s2sout.try_connect(host_session, connect_host, connect_port, err)
178         host_session.connecting = true;
179
180         if not err then
181                 local IPs = {};
182                 host_session.ip_hosts = IPs;
183                 local handle4, handle6;
184                 local have_other_result = not(has_ipv4) or not(has_ipv6) or false;
185
186                 if has_ipv4 then
187                         handle4 = adns.lookup(function (reply, err)
188                                 handle4 = nil;
189
190                                 if reply and reply[#reply] and reply[#reply].a then
191                                         for _, ip in ipairs(reply) do
192                                                 log("debug", "DNS reply for %s gives us %s", connect_host, ip.a);
193                                                 IPs[#IPs+1] = new_ip(ip.a, "IPv4");
194                                         end
195                                 end
196
197                                 if have_other_result then
198                                         if #IPs > 0 then
199                                                 rfc6724_dest(host_session.ip_hosts, get_sources(host_session.ip_hosts));
200                                                 for i = 1, #IPs do
201                                                         IPs[i] = {ip = IPs[i], port = connect_port};
202                                                 end
203                                                 host_session.ip_choice = 0;
204                                                 s2sout.try_next_ip(host_session);
205                                         else
206                                                 log("debug", "DNS lookup failed to get a response for %s", connect_host);
207                                                 host_session.ip_hosts = nil;
208                                                 if not s2sout.attempt_connection(host_session, "name resolution failed") then -- Retry if we can
209                                                         log("debug", "No other records to try for %s - destroying", host_session.to_host);
210                                                         err = err and (": "..err) or "";
211                                                         s2s_destroy_session(host_session, "DNS resolution failed"..err); -- End of the line, we can't
212                                                 end
213                                         end
214                                 else
215                                         have_other_result = true;
216                                 end
217                         end, connect_host, "A", "IN");
218                 else
219                         have_other_result = true;
220                 end
221
222                 if has_ipv6 then
223                         handle6 = adns.lookup(function (reply, err)
224                                 handle6 = nil;
225
226                                 if reply and reply[#reply] and reply[#reply].aaaa then
227                                         for _, ip in ipairs(reply) do
228                                                 log("debug", "DNS reply for %s gives us %s", connect_host, ip.aaaa);
229                                                 IPs[#IPs+1] = new_ip(ip.aaaa, "IPv6");
230                                         end
231                                 end
232
233                                 if have_other_result then
234                                         if #IPs > 0 then
235                                                 rfc6724_dest(host_session.ip_hosts, get_sources(host_session.ip_hosts));
236                                                 for i = 1, #IPs do
237                                                         IPs[i] = {ip = IPs[i], port = connect_port};
238                                                 end
239                                                 host_session.ip_choice = 0;
240                                                 s2sout.try_next_ip(host_session);
241                                         else
242                                                 log("debug", "DNS lookup failed to get a response for %s", connect_host);
243                                                 host_session.ip_hosts = nil;
244                                                 if not s2sout.attempt_connection(host_session, "name resolution failed") then -- Retry if we can
245                                                         log("debug", "No other records to try for %s - destroying", host_session.to_host);
246                                                         err = err and (": "..err) or "";
247                                                         s2s_destroy_session(host_session, "DNS resolution failed"..err); -- End of the line, we can't
248                                                 end
249                                         end
250                                 else
251                                         have_other_result = true;
252                                 end
253                         end, connect_host, "AAAA", "IN");
254                 else
255                         have_other_result = true;
256                 end
257                 return true;
258         elseif host_session.ip_hosts and #host_session.ip_hosts > host_session.ip_choice then -- Not our first attempt, and we also have IPs left to try
259                 s2sout.try_next_ip(host_session);
260         else
261                 host_session.ip_hosts = nil;
262                 if not s2sout.attempt_connection(host_session, "out of IP addresses") then -- Retry if we can
263                         log("debug", "No other records to try for %s - destroying", host_session.to_host);
264                         err = err and (": "..err) or "";
265                         s2s_destroy_session(host_session, "Connecting failed"..err); -- End of the line, we can't
266                         return false;
267                 end
268         end
269
270         return true;
271 end
272
273 function s2sout.make_connect(host_session, connect_host, connect_port)
274         (host_session.log or log)("debug", "Beginning new connection attempt to %s ([%s]:%d)", host_session.to_host, connect_host.addr, connect_port);
275
276         -- Reset secure flag in case this is another
277         -- connection attempt after a failed STARTTLS
278         host_session.secure = nil;
279         host_session.encrypted = nil;
280
281         local conn, handler;
282         local proto = connect_host.proto;
283         if proto == "IPv4" then
284                 conn, handler = socket.tcp();
285         elseif proto == "IPv6" and socket.tcp6 then
286                 conn, handler = socket.tcp6();
287         else
288                 handler = "Unsupported protocol: "..tostring(proto);
289         end
290
291         if not conn then
292                 log("warn", "Failed to create outgoing connection, system error: %s", handler);
293                 return false, handler;
294         end
295
296         conn:settimeout(0);
297         local success, err = conn:connect(connect_host.addr, connect_port);
298         if not success and err ~= "timeout" then
299                 log("warn", "s2s connect() to %s (%s:%d) failed: %s", host_session.to_host, connect_host.addr, connect_port, err);
300                 return false, err;
301         end
302
303         conn = wrapclient(conn, connect_host.addr, connect_port, s2s_listener, "*a");
304         host_session.conn = conn;
305
306         -- Register this outgoing connection so that xmppserver_listener knows about it
307         -- otherwise it will assume it is a new incoming connection
308         s2s_listener.register_outgoing(conn, host_session);
309
310         log("debug", "Connection attempt in progress...");
311         return true;
312 end
313
314 module:hook_global("service-added", function (event)
315         if event.name ~= "s2s" then return end
316
317         local s2s_sources = portmanager.get_active_services():get("s2s");
318         if not s2s_sources then
319                 module:log("warn", "s2s not listening on any ports, outgoing connections may fail");
320                 return;
321         end
322         for source, _ in pairs(s2s_sources) do
323                 if source:find(":") then
324                         has_ipv6 = true;
325                 else
326                         has_ipv4 = true;
327                 end
328         end
329 end);
330
331 return s2sout;