00902283 |
1 | #!/usr/bin/env python |
2 | # coding: utf-8 |
3 | |
4 | """ |
5 | A pure python ping implementation using raw sockets. |
6 | |
7 | Note that ICMP messages can only be send from processes running as root |
8 | (in Windows, you must run this script as 'Administrator'). |
9 | |
10 | Bugs are naturally mine. I'd be glad to hear about them. There are |
11 | certainly word - size dependencies here. |
12 | |
13 | :homepage: https://github.com/jedie/python-ping/ |
14 | :copyleft: 1989-2011 by the python-ping team, see AUTHORS for more details. |
15 | :license: GNU GPL v2, see LICENSE for more details. |
16 | """ |
17 | |
18 | |
19 | import array |
20 | import os |
21 | import select |
22 | import signal |
23 | import socket |
24 | import struct |
25 | import sys |
26 | import time |
27 | |
28 | |
29 | if sys.platform.startswith("win32"): |
30 | # On Windows, the best timer is time.clock() |
31 | default_timer = time.clock |
32 | else: |
33 | # On most other platforms the best timer is time.time() |
34 | default_timer = time.time |
35 | |
36 | |
37 | # ICMP parameters |
38 | ICMP_ECHOREPLY = 0 # Echo reply (per RFC792) |
39 | ICMP_ECHO = 8 # Echo request (per RFC792) |
40 | ICMP_MAX_RECV = 2048 # Max size of incoming buffer |
41 | |
42 | MAX_SLEEP = 1000 |
43 | |
44 | |
45 | def calculate_checksum(source_string): |
46 | """ |
47 | A port of the functionality of in_cksum() from ping.c |
48 | Ideally this would act on the string as a series of 16-bit ints (host |
49 | packed), but this works. |
50 | Network data is big-endian, hosts are typically little-endian |
51 | """ |
52 | if len(source_string)%2: |
53 | source_string += "\x00" |
54 | converted = array.array("H", source_string) |
55 | if sys.byteorder == "big": |
56 | converted.byteswap() |
57 | val = sum(converted) |
58 | |
59 | val &= 0xffffffff # Truncate val to 32 bits (a variance from ping.c, which |
60 | # uses signed ints, but overflow is unlikely in ping) |
61 | |
62 | val = (val >> 16) + (val & 0xffff) # Add high 16 bits to low 16 bits |
63 | val += (val >> 16) # Add carry from above (if any) |
64 | answer = ~val & 0xffff # Invert and truncate to 16 bits |
65 | answer = socket.htons(answer) |
66 | |
67 | return answer |
68 | |
69 | |
70 | def is_valid_ip4_address(addr): |
71 | parts = addr.split(".") |
72 | if not len(parts) == 4: |
73 | return False |
74 | for part in parts: |
75 | try: |
76 | number = int(part) |
77 | except ValueError: |
78 | return False |
79 | if number > 255: |
80 | return False |
81 | return True |
82 | |
83 | def to_ip(addr): |
84 | if is_valid_ip4_address(addr): |
85 | return addr |
86 | return socket.gethostbyname(addr) |
87 | |
88 | |
89 | class Ping(object): |
90 | def __init__(self, destination, timeout=1000, packet_size=55, own_id=None): |
91 | self.destination = destination |
92 | self.timeout = timeout |
93 | self.packet_size = packet_size |
94 | if own_id is None: |
95 | self.own_id = os.getpid() & 0xFFFF |
96 | else: |
97 | self.own_id = own_id |
98 | |
99 | try: |
100 | # FIXME: Use destination only for display this line here? see: https://github.com/jedie/python-ping/issues/3 |
101 | self.dest_ip = to_ip(self.destination) |
102 | except socket.gaierror as e: |
103 | self.print_unknown_host(e) |
104 | # else: |
105 | # self.print_start() |
106 | |
107 | self.seq_number = 0 |
108 | self.send_count = 0 |
109 | self.receive_count = 0 |
110 | self.min_time = 999999999 |
111 | self.max_time = 0.0 |
112 | self.total_time = 0.0 |
113 | |
114 | #-------------------------------------------------------------------------- |
115 | |
116 | def print_start(self): |
117 | print("\nPYTHON-PING %s (%s): %d data bytes" % (self.destination, self.dest_ip, self.packet_size)) |
118 | |
119 | def print_unknown_host(self, e): |
120 | print("\nPYTHON-PING: Unknown host: %s (%s)\n" % (self.destination, e.args[1])) |
121 | sys.exit(-1) |
122 | |
123 | def print_success(self, delay, ip, packet_size, ip_header, icmp_header): |
124 | if ip == self.destination: |
125 | from_info = ip |
126 | else: |
127 | from_info = "%s (%s)" % (self.destination, ip) |
128 | |
129 | print("%d bytes from %s: icmp_seq=%d ttl=%d time=%.1f ms" % ( |
130 | packet_size, from_info, icmp_header["seq_number"], ip_header["ttl"], delay) |
131 | ) |
132 | #print("IP header: %r" % ip_header) |
133 | #print("ICMP header: %r" % icmp_header) |
134 | |
135 | def print_failed(self): |
136 | print("Request timed out.") |
137 | |
138 | def print_exit(self): |
139 | print("\n----%s PYTHON PING Statistics----" % (self.destination)) |
140 | |
141 | lost_count = self.send_count - self.receive_count |
142 | #print("%i packets lost" % lost_count) |
143 | lost_rate = float(lost_count) / self.send_count * 100.0 |
144 | |
145 | print("%d packets transmitted, %d packets received, %0.1f%% packet loss" % ( |
146 | self.send_count, self.receive_count, lost_rate |
147 | )) |
148 | |
149 | if self.receive_count > 0: |
150 | print("round-trip (ms) min/avg/max = %0.3f/%0.3f/%0.3f" % ( |
151 | self.min_time, self.total_time / self.receive_count, self.max_time |
152 | )) |
153 | |
154 | print("") |
155 | |
156 | #-------------------------------------------------------------------------- |
157 | |
158 | def signal_handler(self, signum, frame): |
159 | """ |
160 | Handle print_exit via signals |
161 | """ |
162 | self.print_exit() |
163 | print("\n(Terminated with signal %d)\n" % (signum)) |
164 | sys.exit(0) |
165 | |
166 | def setup_signal_handler(self): |
167 | signal.signal(signal.SIGINT, self.signal_handler) # Handle Ctrl-C |
168 | if hasattr(signal, "SIGBREAK"): |
169 | # Handle Ctrl-Break e.g. under Windows |
170 | signal.signal(signal.SIGBREAK, self.signal_handler) |
171 | |
172 | #-------------------------------------------------------------------------- |
173 | |
174 | def header2dict(self, names, struct_format, data): |
175 | """ unpack the raw received IP and ICMP header informations to a dict """ |
176 | unpacked_data = struct.unpack(struct_format, data) |
177 | return dict(zip(names, unpacked_data)) |
178 | |
179 | #-------------------------------------------------------------------------- |
180 | |
181 | def run(self, count=None, deadline=None): |
182 | """ |
183 | send and receive pings in a loop. Stop if count or until deadline. |
184 | """ |
185 | self.setup_signal_handler() |
186 | |
187 | while True: |
188 | delay = self.do() |
189 | |
190 | self.seq_number += 1 |
191 | if count and self.seq_number >= count: |
192 | break |
193 | if deadline and self.total_time >= deadline: |
194 | break |
195 | |
196 | if delay == None: |
197 | delay = 0 |
198 | |
199 | # Pause for the remainder of the MAX_SLEEP period (if applicable) |
200 | if (MAX_SLEEP > delay): |
201 | time.sleep((MAX_SLEEP - delay) / 1000.0) |
202 | |
203 | self.print_exit() |
204 | |
205 | def do(self): |
206 | """ |
207 | Send one ICMP ECHO_REQUEST and receive the response until self.timeout |
208 | """ |
209 | try: # One could use UDP here, but it's obscure |
210 | current_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname("icmp")) |
211 | except socket.error, (errno, msg): |
212 | if errno == 1: |
213 | # Operation not permitted - Add more information to traceback |
214 | etype, evalue, etb = sys.exc_info() |
215 | evalue = etype( |
216 | "%s - Note that ICMP messages can only be send from processes running as root." % evalue |
217 | ) |
218 | raise etype, evalue, etb |
219 | raise # raise the original error |
220 | |
221 | send_time = self.send_one_ping(current_socket) |
222 | if send_time == None: |
223 | return |
224 | self.send_count += 1 |
225 | |
226 | receive_time, packet_size, ip, ip_header, icmp_header = self.receive_one_ping(current_socket) |
227 | current_socket.close() |
228 | |
229 | if receive_time: |
230 | self.receive_count += 1 |
231 | delay = (receive_time - send_time) * 1000.0 |
232 | self.total_time += delay |
233 | if self.min_time > delay: |
234 | self.min_time = delay |
235 | if self.max_time < delay: |
236 | self.max_time = delay |
237 | |
238 | # self.print_success(delay, ip, packet_size, ip_header, icmp_header) |
239 | return delay |
240 | # else: |
241 | # self.print_failed() |
242 | |
243 | def send_one_ping(self, current_socket): |
244 | """ |
245 | Send one ICMP ECHO_REQUEST |
246 | """ |
247 | # Header is type (8), code (8), checksum (16), id (16), sequence (16) |
248 | checksum = 0 |
249 | |
250 | # Make a dummy header with a 0 checksum. |
251 | header = struct.pack( |
252 | "!BBHHH", ICMP_ECHO, 0, checksum, self.own_id, self.seq_number |
253 | ) |
254 | |
255 | padBytes = [] |
256 | startVal = 0x42 |
257 | for i in range(startVal, startVal + (self.packet_size)): |
258 | padBytes += [(i & 0xff)] # Keep chars in the 0-255 range |
259 | data = bytes(padBytes) |
260 | |
261 | # Calculate the checksum on the data and the dummy header. |
262 | checksum = calculate_checksum(header + data) # Checksum is in network order |
263 | |
264 | # Now that we have the right checksum, we put that in. It's just easier |
265 | # to make up a new header than to stuff it into the dummy. |
266 | header = struct.pack( |
267 | "!BBHHH", ICMP_ECHO, 0, checksum, self.own_id, self.seq_number |
268 | ) |
269 | |
270 | packet = header + data |
271 | |
272 | send_time = default_timer() |
273 | |
274 | try: |
275 | current_socket.sendto(packet, (self.destination, 1)) # Port number is irrelevant for ICMP |
276 | except socket.error as e: |
277 | print("General failure (%s)" % (e.args[1])) |
278 | current_socket.close() |
279 | return |
280 | |
281 | return send_time |
282 | |
283 | def receive_one_ping(self, current_socket): |
284 | """ |
285 | Receive the ping from the socket. timeout = in ms |
286 | """ |
287 | timeout = self.timeout / 1000.0 |
288 | |
289 | while True: # Loop while waiting for packet or timeout |
290 | select_start = default_timer() |
291 | inputready, outputready, exceptready = select.select([current_socket], [], [], timeout) |
292 | select_duration = (default_timer() - select_start) |
293 | if inputready == []: # timeout |
294 | return None, 0, 0, 0, 0 |
295 | |
296 | receive_time = default_timer() |
297 | |
298 | packet_data, address = current_socket.recvfrom(ICMP_MAX_RECV) |
299 | |
300 | icmp_header = self.header2dict( |
301 | names=[ |
302 | "type", "code", "checksum", |
303 | "packet_id", "seq_number" |
304 | ], |
305 | struct_format="!BBHHH", |
306 | data=packet_data[20:28] |
307 | ) |
308 | |
309 | if icmp_header["packet_id"] == self.own_id: # Our packet |
310 | ip_header = self.header2dict( |
311 | names=[ |
312 | "version", "type", "length", |
313 | "id", "flags", "ttl", "protocol", |
314 | "checksum", "src_ip", "dest_ip" |
315 | ], |
316 | struct_format="!BBHHHBBHII", |
317 | data=packet_data[:20] |
318 | ) |
319 | packet_size = len(packet_data) - 28 |
320 | ip = socket.inet_ntoa(struct.pack("!I", ip_header["src_ip"])) |
321 | # XXX: Why not ip = address[0] ??? |
322 | return receive_time, packet_size, ip, ip_header, icmp_header |
323 | |
324 | timeout = timeout - select_duration |
325 | if timeout <= 0: |
326 | return None, 0, 0, 0, 0 |
327 | |
328 | |
329 | def verbose_ping(hostname, timeout=1000, count=3, packet_size=55): |
330 | p = Ping(hostname, timeout, packet_size) |
331 | p.run(count) |
332 | |
333 | |
334 | if __name__ == '__main__': |
335 | # FIXME: Add a real CLI |
336 | if len(sys.argv) == 1: |
337 | print "DEMO" |
338 | |
339 | # These should work: |
340 | verbose_ping("heise.de") |
341 | verbose_ping("google.com") |
342 | |
343 | # Inconsistent on Windows w/ ActivePython (Python 3.2 resolves correctly |
344 | # to the local host, but 2.7 tries to resolve to the local *gateway*) |
345 | verbose_ping("localhost") |
346 | |
347 | # Should fail with 'getaddrinfo print_failed': |
348 | verbose_ping("foobar_url.foobar") |
349 | |
350 | # Should fail (timeout), but it depends on the local network: |
351 | verbose_ping("192.168.255.254") |
352 | |
353 | # Should fails with 'The requested address is not valid in its context': |
354 | verbose_ping("0.0.0.0") |
355 | elif len(sys.argv) == 2: |
356 | verbose_ping(sys.argv[1]) |
357 | else: |
358 | print "Error: call ./ping.py domain.tld" |