5 A pure python ping implementation using raw sockets.
7 Note that ICMP messages can only be send from processes running as root
8 (in Windows, you must run this script as 'Administrator').
10 Bugs are naturally mine. I'd be glad to hear about them. There are
11 certainly word - size dependencies here.
13 :homepage: https://github.com/jedie/python-ping/
14 :copyleft: 1989-2011 by the python-ping team, see AUTHORS for more details.
15 :license: GNU GPL v2, see LICENSE for more details.
29 if sys
.platform
.startswith("win32"):
30 # On Windows, the best timer is time.clock()
31 default_timer
= time
.clock
33 # On most other platforms the best timer is time.time()
34 default_timer
= time
.time
38 ICMP_ECHOREPLY
= 0 # Echo reply (per RFC792)
39 ICMP_ECHO
= 8 # Echo request (per RFC792)
40 ICMP_MAX_RECV
= 2048 # Max size of incoming buffer
45 def calculate_checksum(source_string
):
47 A port of the functionality of in_cksum() from ping.c
48 Ideally this would act on the string as a series of 16-bit ints (host
49 packed), but this works.
50 Network data is big-endian, hosts are typically little-endian
52 if len(source_string
)%2:
53 source_string
+= "\x00"
54 converted
= array
.array("H", source_string
)
55 if sys
.byteorder
== "big":
59 val
&= 0xffffffff # Truncate val to 32 bits (a variance from ping.c, which
60 # uses signed ints, but overflow is unlikely in ping)
62 val
= (val
>> 16) + (val
& 0xffff) # Add high 16 bits to low 16 bits
63 val
+= (val
>> 16) # Add carry from above (if any)
64 answer
= ~val
& 0xffff # Invert and truncate to 16 bits
65 answer
= socket
.htons(answer
)
70 def is_valid_ip4_address(addr
):
71 parts
= addr
.split(".")
72 if not len(parts
) == 4:
84 if is_valid_ip4_address(addr
):
86 return socket
.gethostbyname(addr
)
90 def __init__(self
, destination
, timeout
=1000, packet_size
=55, own_id
=None):
91 self
.destination
= destination
92 self
.timeout
= timeout
93 self
.packet_size
= packet_size
95 self
.own_id
= os
.getpid() & 0xFFFF
100 # FIXME: Use destination only for display this line here? see: https://github.com/jedie/python-ping/issues/3
101 self
.dest_ip
= to_ip(self
.destination
)
102 except socket
.gaierror
as e
:
103 self
.print_unknown_host(e
)
109 self
.receive_count
= 0
110 self
.min_time
= 999999999
112 self
.total_time
= 0.0
114 #--------------------------------------------------------------------------
116 def print_start(self
):
117 print("\nPYTHON-PING %s (%s): %d data bytes" % (self
.destination
, self
.dest_ip
, self
.packet_size
))
119 def print_unknown_host(self
, e
):
120 print("\nPYTHON-PING: Unknown host: %s (%s)\n" % (self
.destination
, e
.args
[1]))
123 def print_success(self
, delay
, ip
, packet_size
, ip_header
, icmp_header
):
124 if ip
== self
.destination
:
127 from_info
= "%s (%s)" % (self
.destination
, ip
)
129 print("%d bytes from %s: icmp_seq=%d ttl=%d time=%.1f ms" % (
130 packet_size
, from_info
, icmp_header
["seq_number"], ip_header
["ttl"], delay
)
132 #print("IP header: %r" % ip_header)
133 #print("ICMP header: %r" % icmp_header)
135 def print_failed(self
):
136 print("Request timed out.")
138 def print_exit(self
):
139 print("\n----%s PYTHON PING Statistics----" % (self
.destination
))
141 lost_count
= self
.send_count
- self
.receive_count
142 #print("%i packets lost" % lost_count)
143 lost_rate
= float(lost_count
) / self
.send_count
* 100.0
145 print("%d packets transmitted, %d packets received, %0.1f%% packet loss" % (
146 self
.send_count
, self
.receive_count
, lost_rate
149 if self
.receive_count
> 0:
150 print("round-trip (ms) min/avg/max = %0.3f/%0.3f/%0.3f" % (
151 self
.min_time
, self
.total_time
/ self
.receive_count
, self
.max_time
156 #--------------------------------------------------------------------------
158 def signal_handler(self
, signum
, frame
):
160 Handle print_exit via signals
163 print("\n(Terminated with signal %d)\n" % (signum
))
166 def setup_signal_handler(self
):
167 signal
.signal(signal
.SIGINT
, self
.signal_handler
) # Handle Ctrl-C
168 if hasattr(signal
, "SIGBREAK"):
169 # Handle Ctrl-Break e.g. under Windows
170 signal
.signal(signal
.SIGBREAK
, self
.signal_handler
)
172 #--------------------------------------------------------------------------
174 def header2dict(self
, names
, struct_format
, data
):
175 """ unpack the raw received IP and ICMP header informations to a dict """
176 unpacked_data
= struct
.unpack(struct_format
, data
)
177 return dict(zip(names
, unpacked_data
))
179 #--------------------------------------------------------------------------
181 def run(self
, count
=None, deadline
=None):
183 send and receive pings in a loop. Stop if count or until deadline.
185 self
.setup_signal_handler()
191 if count
and self
.seq_number
>= count
:
193 if deadline
and self
.total_time
>= deadline
:
199 # Pause for the remainder of the MAX_SLEEP period (if applicable)
200 if (MAX_SLEEP
> delay
):
201 time
.sleep((MAX_SLEEP
- delay
) / 1000.0)
207 Send one ICMP ECHO_REQUEST and receive the response until self.timeout
209 try: # One could use UDP here, but it's obscure
210 current_socket
= socket
.socket(socket
.AF_INET
, socket
.SOCK_RAW
, socket
.getprotobyname("icmp"))
211 except socket
.error
, (errno
, msg
):
213 # Operation not permitted - Add more information to traceback
214 etype
, evalue
, etb
= sys
.exc_info()
216 "%s - Note that ICMP messages can only be send from processes running as root." % evalue
218 raise etype
, evalue
, etb
219 raise # raise the original error
221 send_time
= self
.send_one_ping(current_socket
)
222 if send_time
== None:
226 receive_time
, packet_size
, ip
, ip_header
, icmp_header
= self
.receive_one_ping(current_socket
)
227 current_socket
.close()
230 self
.receive_count
+= 1
231 delay
= (receive_time
- send_time
) * 1000.0
232 self
.total_time
+= delay
233 if self
.min_time
> delay
:
234 self
.min_time
= delay
235 if self
.max_time
< delay
:
236 self
.max_time
= delay
238 # self.print_success(delay, ip, packet_size, ip_header, icmp_header)
241 # self.print_failed()
243 def send_one_ping(self
, current_socket
):
245 Send one ICMP ECHO_REQUEST
247 # Header is type (8), code (8), checksum (16), id (16), sequence (16)
250 # Make a dummy header with a 0 checksum.
251 header
= struct
.pack(
252 "!BBHHH", ICMP_ECHO
, 0, checksum
, self
.own_id
, self
.seq_number
257 for i
in range(startVal
, startVal
+ (self
.packet_size
)):
258 padBytes
+= [(i
& 0xff)] # Keep chars in the 0-255 range
259 data
= bytes(padBytes
)
261 # Calculate the checksum on the data and the dummy header.
262 checksum
= calculate_checksum(header
+ data
) # Checksum is in network order
264 # Now that we have the right checksum, we put that in. It's just easier
265 # to make up a new header than to stuff it into the dummy.
266 header
= struct
.pack(
267 "!BBHHH", ICMP_ECHO
, 0, checksum
, self
.own_id
, self
.seq_number
270 packet
= header
+ data
272 send_time
= default_timer()
275 current_socket
.sendto(packet
, (self
.destination
, 1)) # Port number is irrelevant for ICMP
276 except socket
.error
as e
:
277 print("General failure (%s)" % (e
.args
[1]))
278 current_socket
.close()
283 def receive_one_ping(self
, current_socket
):
285 Receive the ping from the socket. timeout = in ms
287 timeout
= self
.timeout
/ 1000.0
289 while True: # Loop while waiting for packet or timeout
290 select_start
= default_timer()
291 inputready
, outputready
, exceptready
= select
.select([current_socket
], [], [], timeout
)
292 select_duration
= (default_timer() - select_start
)
293 if inputready
== []: # timeout
294 return None, 0, 0, 0, 0
296 receive_time
= default_timer()
298 packet_data
, address
= current_socket
.recvfrom(ICMP_MAX_RECV
)
300 icmp_header
= self
.header2dict(
302 "type", "code", "checksum",
303 "packet_id", "seq_number"
305 struct_format
="!BBHHH",
306 data
=packet_data
[20:28]
309 if icmp_header
["packet_id"] == self
.own_id
: # Our packet
310 ip_header
= self
.header2dict(
312 "version", "type", "length",
313 "id", "flags", "ttl", "protocol",
314 "checksum", "src_ip", "dest_ip"
316 struct_format
="!BBHHHBBHII",
317 data
=packet_data
[:20]
319 packet_size
= len(packet_data
) - 28
320 ip
= socket
.inet_ntoa(struct
.pack("!I", ip_header
["src_ip"]))
321 # XXX: Why not ip = address[0] ???
322 return receive_time
, packet_size
, ip
, ip_header
, icmp_header
324 timeout
= timeout
- select_duration
326 return None, 0, 0, 0, 0
329 def verbose_ping(hostname
, timeout
=1000, count
=3, packet_size
=55):
330 p
= Ping(hostname
, timeout
, packet_size
)
334 if __name__
== '__main__':
335 # FIXME: Add a real CLI
336 if len(sys
.argv
) == 1:
340 verbose_ping("heise.de")
341 verbose_ping("google.com")
343 # Inconsistent on Windows w/ ActivePython (Python 3.2 resolves correctly
344 # to the local host, but 2.7 tries to resolve to the local *gateway*)
345 verbose_ping("localhost")
347 # Should fail with 'getaddrinfo print_failed':
348 verbose_ping("foobar_url.foobar")
350 # Should fail (timeout), but it depends on the local network:
351 verbose_ping("192.168.255.254")
353 # Should fails with 'The requested address is not valid in its context':
354 verbose_ping("0.0.0.0")
355 elif len(sys
.argv
) == 2:
356 verbose_ping(sys
.argv
[1])
358 print "Error: call ./ping.py domain.tld"
This page took 0.386399 seconds and 4 git commands to generate.