mod_s2s/s2sout: Use the local address assigned to UDP sockets instead of util.net...
[prosody.git] / plugins / mod_s2s / s2sout.lib.lua
1 -- Prosody IM
2 -- Copyright (C) 2008-2010 Matthew Wild
3 -- Copyright (C) 2008-2010 Waqas Hussain
4 -- 
5 -- This project is MIT/X11 licensed. Please see the
6 -- COPYING file in the source package for more information.
7 --
8
9 --- Module containing all the logic for connecting to a remote server
10
11 local portmanager = require "core.portmanager";
12 local wrapclient = require "net.server".wrapclient;
13 local initialize_filters = require "util.filters".initialize;
14 local idna_to_ascii = require "util.encodings".idna.to_ascii;
15 local new_ip = require "util.ip".new_ip;
16 local rfc6724_dest = require "util.rfc6724".destination;
17 local socket = require "socket";
18 local adns = require "net.adns";
19 local dns = require "net.dns";
20 local t_insert, t_sort, ipairs = table.insert, table.sort, ipairs;
21
22 local s2s_destroy_session = require "core.s2smanager".destroy_session;
23
24 local log = module._log;
25
26 local anysource = { IPv4 = "0.0.0.0", IPv6 = "::" };
27 local function get_sources(addrs)
28         local sources = {};
29         for _, IP in ipairs(addrs) do
30                 local sock;
31                 if IP.proto == "IPv4" then
32                         sock = socket.udp();
33                 elseif IP.proto == "IPv6" then
34                         sock = socket.udp6();
35                 end
36                 sock:setpeername(IP.addr, 9);
37                 local localaddr = sock:getsockname() or anysource[IP.proto];
38                 sock:close();
39                 if not sources[localaddr] then
40                         sources[localaddr] = true;
41                         t_insert(sources, new_ip(localaddr, IP.proto));
42                 end
43         end
44         return sources;
45 end
46 local has_ipv4, has_ipv6;
47
48 local dns_timeout = module:get_option_number("dns_timeout", 15);
49 dns.settimeout(dns_timeout);
50
51 local s2sout = {};
52
53 local s2s_listener;
54
55
56 function s2sout.set_listener(listener)
57         s2s_listener = listener;
58 end
59
60 local function compare_srv_priorities(a,b)
61         return a.priority < b.priority or (a.priority == b.priority and a.weight > b.weight);
62 end
63
64 function s2sout.initiate_connection(host_session)
65         initialize_filters(host_session);
66         host_session.version = 1;
67         
68         -- Kick the connection attempting machine into life
69         if not s2sout.attempt_connection(host_session) then
70                 -- Intentionally not returning here, the
71                 -- session is needed, connected or not
72                 s2s_destroy_session(host_session);
73         end
74         
75         if not host_session.sends2s then
76                 -- A sends2s which buffers data (until the stream is opened)
77                 -- note that data in this buffer will be sent before the stream is authed
78                 -- and will not be ack'd in any way, successful or otherwise
79                 local buffer;
80                 function host_session.sends2s(data)
81                         if not buffer then
82                                 buffer = {};
83                                 host_session.send_buffer = buffer;
84                         end
85                         log("debug", "Buffering data on unconnected s2sout to %s", tostring(host_session.to_host));
86                         buffer[#buffer+1] = data;
87                         log("debug", "Buffered item %d: %s", #buffer, tostring(data));
88                 end
89         end
90 end
91
92 function s2sout.attempt_connection(host_session, err)
93         local to_host = host_session.to_host;
94         local connect_host, connect_port = to_host and idna_to_ascii(to_host), 5269;
95         
96         if not connect_host then
97                 return false;
98         end
99         
100         if not err then -- This is our first attempt
101                 log("debug", "First attempt to connect to %s, starting with SRV lookup...", to_host);
102                 host_session.connecting = true;
103                 local handle;
104                 handle = adns.lookup(function (answer)
105                         handle = nil;
106                         host_session.connecting = nil;
107                         if answer and #answer > 0 then
108                                 log("debug", "%s has SRV records, handling...", to_host);
109                                 local srv_hosts = { answer = answer };
110                                 host_session.srv_hosts = srv_hosts;
111                                 for _, record in ipairs(answer) do
112                                         t_insert(srv_hosts, record.srv);
113                                 end
114                                 if #srv_hosts == 1 and srv_hosts[1].target == "." then
115                                         log("debug", "%s does not provide a XMPP service", to_host);
116                                         s2s_destroy_session(host_session, err); -- Nothing to see here
117                                         return;
118                                 end
119                                 t_sort(srv_hosts, compare_srv_priorities);
120                                 
121                                 local srv_choice = srv_hosts[1];
122                                 host_session.srv_choice = 1;
123                                 if srv_choice then
124                                         connect_host, connect_port = srv_choice.target or to_host, srv_choice.port or connect_port;
125                                         log("debug", "Best record found, will connect to %s:%d", connect_host, connect_port);
126                                 end
127                         else
128                                 log("debug", "%s has no SRV records, falling back to A/AAAA", to_host);
129                         end
130                         -- Try with SRV, or just the plain hostname if no SRV
131                         local ok, err = s2sout.try_connect(host_session, connect_host, connect_port);
132                         if not ok then
133                                 if not s2sout.attempt_connection(host_session, err) then
134                                         -- No more attempts will be made
135                                         s2s_destroy_session(host_session, err);
136                                 end
137                         end
138                 end, "_xmpp-server._tcp."..connect_host..".", "SRV");
139                 
140                 return true; -- Attempt in progress
141         elseif host_session.ip_hosts then
142                 return s2sout.try_connect(host_session, connect_host, connect_port, err);
143         elseif host_session.srv_hosts and #host_session.srv_hosts > host_session.srv_choice then -- Not our first attempt, and we also have SRV
144                 host_session.srv_choice = host_session.srv_choice + 1;
145                 local srv_choice = host_session.srv_hosts[host_session.srv_choice];
146                 connect_host, connect_port = srv_choice.target or to_host, srv_choice.port or connect_port;
147                 host_session.log("info", "Connection failed (%s). Attempt #%d: This time to %s:%d", tostring(err), host_session.srv_choice, connect_host, connect_port);
148         else
149                 host_session.log("info", "Out of connection options, can't connect to %s", tostring(host_session.to_host));
150                 -- We're out of options
151                 return false;
152         end
153         
154         if not (connect_host and connect_port) then
155                 -- Likely we couldn't resolve DNS
156                 log("warn", "Hmm, we're without a host (%s) and port (%s) to connect to for %s, giving up :(", tostring(connect_host), tostring(connect_port), tostring(to_host));
157                 return false;
158         end
159
160         return s2sout.try_connect(host_session, connect_host, connect_port);
161 end
162
163 function s2sout.try_next_ip(host_session)
164         host_session.connecting = nil;
165         host_session.ip_choice = host_session.ip_choice + 1;
166         local ip = host_session.ip_hosts[host_session.ip_choice];
167         local ok, err= s2sout.make_connect(host_session, ip.ip, ip.port);
168         if not ok then
169                 if not s2sout.attempt_connection(host_session, err or "closed") then
170                         err = err and (": "..err) or "";
171                         s2s_destroy_session(host_session, "Connection failed"..err);
172                 end
173         end
174 end
175
176 function s2sout.try_connect(host_session, connect_host, connect_port, err)
177         host_session.connecting = true;
178
179         if not err then
180                 local IPs = {};
181                 host_session.ip_hosts = IPs;
182                 local handle4, handle6;
183                 local have_other_result = not(has_ipv4) or not(has_ipv6) or false;
184
185                 if has_ipv4 then
186                         handle4 = adns.lookup(function (reply, err)
187                                 handle4 = nil;
188
189                                 if reply and reply[#reply] and reply[#reply].a then
190                                         for _, ip in ipairs(reply) do
191                                                 log("debug", "DNS reply for %s gives us %s", connect_host, ip.a);
192                                                 IPs[#IPs+1] = new_ip(ip.a, "IPv4");
193                                         end
194                                 end
195
196                                 if have_other_result then
197                                         if #IPs > 0 then
198                                                 rfc6724_dest(host_session.ip_hosts, get_sources(host_session.ip_hosts));
199                                                 for i = 1, #IPs do
200                                                         IPs[i] = {ip = IPs[i], port = connect_port};
201                                                 end
202                                                 host_session.ip_choice = 0;
203                                                 s2sout.try_next_ip(host_session);
204                                         else
205                                                 log("debug", "DNS lookup failed to get a response for %s", connect_host);
206                                                 host_session.ip_hosts = nil;
207                                                 if not s2sout.attempt_connection(host_session, "name resolution failed") then -- Retry if we can
208                                                         log("debug", "No other records to try for %s - destroying", host_session.to_host);
209                                                         err = err and (": "..err) or "";
210                                                         s2s_destroy_session(host_session, "DNS resolution failed"..err); -- End of the line, we can't
211                                                 end
212                                         end
213                                 else
214                                         have_other_result = true;
215                                 end
216                         end, connect_host, "A", "IN");
217                 else
218                         have_other_result = true;
219                 end
220
221                 if has_ipv6 then
222                         handle6 = adns.lookup(function (reply, err)
223                                 handle6 = nil;
224
225                                 if reply and reply[#reply] and reply[#reply].aaaa then
226                                         for _, ip in ipairs(reply) do
227                                                 log("debug", "DNS reply for %s gives us %s", connect_host, ip.aaaa);
228                                                 IPs[#IPs+1] = new_ip(ip.aaaa, "IPv6");
229                                         end
230                                 end
231
232                                 if have_other_result then
233                                         if #IPs > 0 then
234                                                 rfc6724_dest(host_session.ip_hosts, get_sources(host_session.ip_hosts));
235                                                 for i = 1, #IPs do
236                                                         IPs[i] = {ip = IPs[i], port = connect_port};
237                                                 end
238                                                 host_session.ip_choice = 0;
239                                                 s2sout.try_next_ip(host_session);
240                                         else
241                                                 log("debug", "DNS lookup failed to get a response for %s", connect_host);
242                                                 host_session.ip_hosts = nil;
243                                                 if not s2sout.attempt_connection(host_session, "name resolution failed") then -- Retry if we can
244                                                         log("debug", "No other records to try for %s - destroying", host_session.to_host);
245                                                         err = err and (": "..err) or "";
246                                                         s2s_destroy_session(host_session, "DNS resolution failed"..err); -- End of the line, we can't
247                                                 end
248                                         end
249                                 else
250                                         have_other_result = true;
251                                 end
252                         end, connect_host, "AAAA", "IN");
253                 else
254                         have_other_result = true;
255                 end
256                 return true;
257         elseif host_session.ip_hosts and #host_session.ip_hosts > host_session.ip_choice then -- Not our first attempt, and we also have IPs left to try
258                 s2sout.try_next_ip(host_session);
259         else
260                 host_session.ip_hosts = nil;
261                 if not s2sout.attempt_connection(host_session, "out of IP addresses") then -- Retry if we can
262                         log("debug", "No other records to try for %s - destroying", host_session.to_host);
263                         err = err and (": "..err) or "";
264                         s2s_destroy_session(host_session, "Connecting failed"..err); -- End of the line, we can't
265                         return false;
266                 end
267         end
268
269         return true;
270 end
271
272 function s2sout.make_connect(host_session, connect_host, connect_port)
273         (host_session.log or log)("info", "Beginning new connection attempt to %s ([%s]:%d)", host_session.to_host, connect_host.addr, connect_port);
274
275         -- Reset secure flag in case this is another
276         -- connection attempt after a failed STARTTLS
277         host_session.secure = nil;
278
279         local conn, handler;
280         local proto = connect_host.proto;
281         if proto == "IPv4" then
282                 conn, handler = socket.tcp();
283         elseif proto == "IPv6" and socket.tcp6 then
284                 conn, handler = socket.tcp6();
285         else
286                 handler = "Unsupported protocol: "..tostring(proto);
287         end
288         
289         if not conn then
290                 log("warn", "Failed to create outgoing connection, system error: %s", handler);
291                 return false, handler;
292         end
293
294         conn:settimeout(0);
295         local success, err = conn:connect(connect_host.addr, connect_port);
296         if not success and err ~= "timeout" then
297                 log("warn", "s2s connect() to %s (%s:%d) failed: %s", host_session.to_host, connect_host.addr, connect_port, err);
298                 return false, err;
299         end
300         
301         conn = wrapclient(conn, connect_host.addr, connect_port, s2s_listener, "*a");
302         host_session.conn = conn;
303         
304         local filter = initialize_filters(host_session);
305         local w, log = conn.write, host_session.log;
306         host_session.sends2s = function (t)
307                 log("debug", "sending: %s", (t.top_tag and t:top_tag()) or t:match("^[^>]*>?"));
308                 if t.name then
309                         t = filter("stanzas/out", t);
310                 end
311                 if t then
312                         t = filter("bytes/out", tostring(t));
313                         if t then
314                                 return w(conn, tostring(t));
315                         end
316                 end
317         end
318         
319         -- Register this outgoing connection so that xmppserver_listener knows about it
320         -- otherwise it will assume it is a new incoming connection
321         s2s_listener.register_outgoing(conn, host_session);
322         
323         log("debug", "Connection attempt in progress...");
324         return true;
325 end
326
327 module:hook_global("service-added", function (event)
328         if event.name ~= "s2s" then return end
329
330         local s2s_sources = portmanager.get_active_services():get("s2s");
331         if not s2s_sources then
332                 module:log("warn", "s2s not listening on any ports, outgoing connections may fail");
333                 return;
334         end
335         for source, _ in pairs(s2s_sources) do
336                 if source:find(":") then
337                         has_ipv6 = true;
338                 else
339                         has_ipv4 = true;
340                 end
341         end
342 end);
343
344 return s2sout;