X-Git-Url: http://git.harvie.cz/?p=svn%2FPrometheus-QoS%2F.git;a=blobdiff_plain;f=optional-tools%2Fping.py;fp=optional-tools%2Fping.py;h=4d6660363fc84ddbe9c995294ecaff56929f9eac;hp=0000000000000000000000000000000000000000;hb=00902283d975cc4cf209b670839f2cd58d756942;hpb=8eb6a62744fc5aa425ebd3d931a6145ea2898926 diff --git a/optional-tools/ping.py b/optional-tools/ping.py new file mode 100644 index 0000000..4d66603 --- /dev/null +++ b/optional-tools/ping.py @@ -0,0 +1,358 @@ +#!/usr/bin/env python +# coding: utf-8 + +""" + A pure python ping implementation using raw sockets. + + Note that ICMP messages can only be send from processes running as root + (in Windows, you must run this script as 'Administrator'). + + Bugs are naturally mine. I'd be glad to hear about them. There are + certainly word - size dependencies here. + + :homepage: https://github.com/jedie/python-ping/ + :copyleft: 1989-2011 by the python-ping team, see AUTHORS for more details. + :license: GNU GPL v2, see LICENSE for more details. +""" + + +import array +import os +import select +import signal +import socket +import struct +import sys +import time + + +if sys.platform.startswith("win32"): + # On Windows, the best timer is time.clock() + default_timer = time.clock +else: + # On most other platforms the best timer is time.time() + default_timer = time.time + + +# ICMP parameters +ICMP_ECHOREPLY = 0 # Echo reply (per RFC792) +ICMP_ECHO = 8 # Echo request (per RFC792) +ICMP_MAX_RECV = 2048 # Max size of incoming buffer + +MAX_SLEEP = 1000 + + +def calculate_checksum(source_string): + """ + A port of the functionality of in_cksum() from ping.c + Ideally this would act on the string as a series of 16-bit ints (host + packed), but this works. + Network data is big-endian, hosts are typically little-endian + """ + if len(source_string)%2: + source_string += "\x00" + converted = array.array("H", source_string) + if sys.byteorder == "big": + converted.byteswap() + val = sum(converted) + + val &= 0xffffffff # Truncate val to 32 bits (a variance from ping.c, which + # uses signed ints, but overflow is unlikely in ping) + + val = (val >> 16) + (val & 0xffff) # Add high 16 bits to low 16 bits + val += (val >> 16) # Add carry from above (if any) + answer = ~val & 0xffff # Invert and truncate to 16 bits + answer = socket.htons(answer) + + return answer + + +def is_valid_ip4_address(addr): + parts = addr.split(".") + if not len(parts) == 4: + return False + for part in parts: + try: + number = int(part) + except ValueError: + return False + if number > 255: + return False + return True + +def to_ip(addr): + if is_valid_ip4_address(addr): + return addr + return socket.gethostbyname(addr) + + +class Ping(object): + def __init__(self, destination, timeout=1000, packet_size=55, own_id=None): + self.destination = destination + self.timeout = timeout + self.packet_size = packet_size + if own_id is None: + self.own_id = os.getpid() & 0xFFFF + else: + self.own_id = own_id + + try: + # FIXME: Use destination only for display this line here? see: https://github.com/jedie/python-ping/issues/3 + self.dest_ip = to_ip(self.destination) + except socket.gaierror as e: + self.print_unknown_host(e) +# else: +# self.print_start() + + self.seq_number = 0 + self.send_count = 0 + self.receive_count = 0 + self.min_time = 999999999 + self.max_time = 0.0 + self.total_time = 0.0 + + #-------------------------------------------------------------------------- + + def print_start(self): + print("\nPYTHON-PING %s (%s): %d data bytes" % (self.destination, self.dest_ip, self.packet_size)) + + def print_unknown_host(self, e): + print("\nPYTHON-PING: Unknown host: %s (%s)\n" % (self.destination, e.args[1])) + sys.exit(-1) + + def print_success(self, delay, ip, packet_size, ip_header, icmp_header): + if ip == self.destination: + from_info = ip + else: + from_info = "%s (%s)" % (self.destination, ip) + + print("%d bytes from %s: icmp_seq=%d ttl=%d time=%.1f ms" % ( + packet_size, from_info, icmp_header["seq_number"], ip_header["ttl"], delay) + ) + #print("IP header: %r" % ip_header) + #print("ICMP header: %r" % icmp_header) + + def print_failed(self): + print("Request timed out.") + + def print_exit(self): + print("\n----%s PYTHON PING Statistics----" % (self.destination)) + + lost_count = self.send_count - self.receive_count + #print("%i packets lost" % lost_count) + lost_rate = float(lost_count) / self.send_count * 100.0 + + print("%d packets transmitted, %d packets received, %0.1f%% packet loss" % ( + self.send_count, self.receive_count, lost_rate + )) + + if self.receive_count > 0: + print("round-trip (ms) min/avg/max = %0.3f/%0.3f/%0.3f" % ( + self.min_time, self.total_time / self.receive_count, self.max_time + )) + + print("") + + #-------------------------------------------------------------------------- + + def signal_handler(self, signum, frame): + """ + Handle print_exit via signals + """ + self.print_exit() + print("\n(Terminated with signal %d)\n" % (signum)) + sys.exit(0) + + def setup_signal_handler(self): + signal.signal(signal.SIGINT, self.signal_handler) # Handle Ctrl-C + if hasattr(signal, "SIGBREAK"): + # Handle Ctrl-Break e.g. under Windows + signal.signal(signal.SIGBREAK, self.signal_handler) + + #-------------------------------------------------------------------------- + + def header2dict(self, names, struct_format, data): + """ unpack the raw received IP and ICMP header informations to a dict """ + unpacked_data = struct.unpack(struct_format, data) + return dict(zip(names, unpacked_data)) + + #-------------------------------------------------------------------------- + + def run(self, count=None, deadline=None): + """ + send and receive pings in a loop. Stop if count or until deadline. + """ + self.setup_signal_handler() + + while True: + delay = self.do() + + self.seq_number += 1 + if count and self.seq_number >= count: + break + if deadline and self.total_time >= deadline: + break + + if delay == None: + delay = 0 + + # Pause for the remainder of the MAX_SLEEP period (if applicable) + if (MAX_SLEEP > delay): + time.sleep((MAX_SLEEP - delay) / 1000.0) + + self.print_exit() + + def do(self): + """ + Send one ICMP ECHO_REQUEST and receive the response until self.timeout + """ + try: # One could use UDP here, but it's obscure + current_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname("icmp")) + except socket.error, (errno, msg): + if errno == 1: + # Operation not permitted - Add more information to traceback + etype, evalue, etb = sys.exc_info() + evalue = etype( + "%s - Note that ICMP messages can only be send from processes running as root." % evalue + ) + raise etype, evalue, etb + raise # raise the original error + + send_time = self.send_one_ping(current_socket) + if send_time == None: + return + self.send_count += 1 + + receive_time, packet_size, ip, ip_header, icmp_header = self.receive_one_ping(current_socket) + current_socket.close() + + if receive_time: + self.receive_count += 1 + delay = (receive_time - send_time) * 1000.0 + self.total_time += delay + if self.min_time > delay: + self.min_time = delay + if self.max_time < delay: + self.max_time = delay + +# self.print_success(delay, ip, packet_size, ip_header, icmp_header) + return delay +# else: +# self.print_failed() + + def send_one_ping(self, current_socket): + """ + Send one ICMP ECHO_REQUEST + """ + # Header is type (8), code (8), checksum (16), id (16), sequence (16) + checksum = 0 + + # Make a dummy header with a 0 checksum. + header = struct.pack( + "!BBHHH", ICMP_ECHO, 0, checksum, self.own_id, self.seq_number + ) + + padBytes = [] + startVal = 0x42 + for i in range(startVal, startVal + (self.packet_size)): + padBytes += [(i & 0xff)] # Keep chars in the 0-255 range + data = bytes(padBytes) + + # Calculate the checksum on the data and the dummy header. + checksum = calculate_checksum(header + data) # Checksum is in network order + + # Now that we have the right checksum, we put that in. It's just easier + # to make up a new header than to stuff it into the dummy. + header = struct.pack( + "!BBHHH", ICMP_ECHO, 0, checksum, self.own_id, self.seq_number + ) + + packet = header + data + + send_time = default_timer() + + try: + current_socket.sendto(packet, (self.destination, 1)) # Port number is irrelevant for ICMP + except socket.error as e: + print("General failure (%s)" % (e.args[1])) + current_socket.close() + return + + return send_time + + def receive_one_ping(self, current_socket): + """ + Receive the ping from the socket. timeout = in ms + """ + timeout = self.timeout / 1000.0 + + while True: # Loop while waiting for packet or timeout + select_start = default_timer() + inputready, outputready, exceptready = select.select([current_socket], [], [], timeout) + select_duration = (default_timer() - select_start) + if inputready == []: # timeout + return None, 0, 0, 0, 0 + + receive_time = default_timer() + + packet_data, address = current_socket.recvfrom(ICMP_MAX_RECV) + + icmp_header = self.header2dict( + names=[ + "type", "code", "checksum", + "packet_id", "seq_number" + ], + struct_format="!BBHHH", + data=packet_data[20:28] + ) + + if icmp_header["packet_id"] == self.own_id: # Our packet + ip_header = self.header2dict( + names=[ + "version", "type", "length", + "id", "flags", "ttl", "protocol", + "checksum", "src_ip", "dest_ip" + ], + struct_format="!BBHHHBBHII", + data=packet_data[:20] + ) + packet_size = len(packet_data) - 28 + ip = socket.inet_ntoa(struct.pack("!I", ip_header["src_ip"])) + # XXX: Why not ip = address[0] ??? + return receive_time, packet_size, ip, ip_header, icmp_header + + timeout = timeout - select_duration + if timeout <= 0: + return None, 0, 0, 0, 0 + + +def verbose_ping(hostname, timeout=1000, count=3, packet_size=55): + p = Ping(hostname, timeout, packet_size) + p.run(count) + + +if __name__ == '__main__': + # FIXME: Add a real CLI + if len(sys.argv) == 1: + print "DEMO" + + # These should work: + verbose_ping("heise.de") + verbose_ping("google.com") + + # Inconsistent on Windows w/ ActivePython (Python 3.2 resolves correctly + # to the local host, but 2.7 tries to resolve to the local *gateway*) + verbose_ping("localhost") + + # Should fail with 'getaddrinfo print_failed': + verbose_ping("foobar_url.foobar") + + # Should fail (timeout), but it depends on the local network: + verbose_ping("192.168.255.254") + + # Should fails with 'The requested address is not valid in its context': + verbose_ping("0.0.0.0") + elif len(sys.argv) == 2: + verbose_ping(sys.argv[1]) + else: + print "Error: call ./ping.py domain.tld"