initial commit

This commit is contained in:
mingyang 2019-12-28 16:26:19 +08:00
commit 9b54936e28
19 changed files with 876 additions and 0 deletions

8
example.py Normal file
View File

@ -0,0 +1,8 @@
from imshow.client import client
import cv2
img1 = cv2.imread('./img/1.jpg')
img2 = cv2.imread('./img/2.jpg')
while True:
client.imshow('Name', img1, waitKey=1)
client.imshow('Name', img2, waitKey=1)

BIN
img/1.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 34 KiB

BIN
img/2.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 46 KiB

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

42
imshow/client.py Normal file
View File

@ -0,0 +1,42 @@
import cv2
import socket
from imshow import wrap
from imshow import network
from imshow import parallel
from imshow import config
token = config.token
host = '192.168.123.222'
port = 12345
class ImageClient():
def __init__(self, host, port, token):
self.host = host
self.port = port
self.token = token
self.sock = network.SocketConnection(parallel.daemon)
self.sock.connect(self.host, self.port)
self.sock.auth(self.token)
self.log = self.sock.log
def imshow(self, name, img, waitKey=0):
self.log('Sending Image {0} to host {1}:{2}'.format(name, host, port))
img = wrap.ImageMessage(img=img, name=name, wait=waitKey)
self.sock.send(img.tobytes())
exit = False
self.log('Waiting for host message to exit.')
while not exit:
msg = self.sock.recv()
if msg.decode('utf-8') == 'continue':
exit = True
client = ImageClient(host, port, token)
if __name__ == '__main__':
img1 = cv2.imread('./img/1.jpg')
img2 = cv2.imread('./img/2.jpg')
while True:
client.imshow('Name', img1, waitKey=1)
client.imshow('Name', img2, waitKey=1)

1
imshow/config.py Normal file
View File

@ -0,0 +1 @@
token = 'sometoken'

447
imshow/network.py Normal file
View File

@ -0,0 +1,447 @@
import socket
import zlib
import json
import struct
import queue
from imshow import parallel
class SocketMessage():
def __init__(self, msg={}):
self.data = msg
self.type = 'empty'
def encode(self):
pass
def decode(self):
pass
def tobytes(self):
self.encode()
js = {}
js['type'] = self.type
js['msg'] = self.data
js = json.dumps(js)
js = js.encode('utf-8')
byte = zlib.compress(js)
return byte
def frombytes(self, byte):
js = zlib.decompress(byte)
js = js.decode('utf-8')
js = json.loads(js)
msg = js['msg']
self.data = msg
self.decode()
class SocketServer():
def __init__(self, ip='0.0.0.0', port=12345, message_handler=None):
self.daemon = parallel.daemon
self.ip = ip
self.port = port
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.bind((self.ip, self.port))
self.socket.listen(5)
self.terminate = False
self.handler = message_handler
def handle_message(self, clientsocket, addr):
# print('Connection Established from clinet', addr)
if self.handler is not None:
self.handler(clientsocket, addr)
pass
def loop(self):
while not self.terminate:
clientsocket,addr = self.socket.accept()
self.daemon.add_job(
self.handle_message,
args=[clientsocket, addr],
name='Client[{0}]'.format(addr)
)
def start(self, back=True):
if back:
self.daemon.add_job(self.loop, name='SocketMainLoop')
else:
self.loop()
# what should a packet header contains:
# 1. message id
# 2. packet id
# 3. total packets
# 4. total size
class PacketHeader():
def __init__(self, mid=0, pid=0, pn=0, sz=0):
self.msg_id = mid
self.pkt_id = pid
self.pkt_num = pn
self.msg_sz = sz
self.header_size = 16
def tobytes(self):
b = struct.pack('LLLL', self.msg_id, self.pkt_id, self.pkt_num, self.msg_sz)
return b
def frombytes(self, b):
self.msg_id, self.pkt_id, self.pkt_num, self.msg_sz = struct.unpack('LLLL', b)
return self
class Packet():
def __init__(self, header=PacketHeader(), msg=b''):
self.header = header
self.msg = msg
self.header_size = self.header.header_size
def frombytes(self, b):
header = b[:self.header_size]
self.msg = b[self.header_size:]
self.header.frombytes(header)
return self
def tobytes(self):
msg = b''
self.header.msg_sz = len(self.msg)
msg += self.header.tobytes()
msg += self.msg
return msg
class PacketFactory():
def __init__(self, max_size=8192, log=None):
self.max_size = max_size
self.id = 0
self.header_size = PacketHeader().header_size
self.log = print
if log != None:
self.log = log
def to_packets(self, msg):
packets = []
length = len(msg)
capacity = self.max_size - self.header_size
num_packets = int((length + capacity - 1) / capacity)
for i in range(num_packets):
header = PacketHeader(self.id, i, num_packets)
packet = Packet(header, msg[i*capacity:(i+1)*capacity])
packets.append(packet)
self.id += 1
return packets
def from_packets(self, packets):
num_packet = len(packets)
self.log('Packet Number:', num_packet)
if num_packet == 0:
return None
msg_id = None
packet_id = 0
message = b''
for packet in packets:
header = packet.header
if msg_id is None:
msg_id = header.msg_id
if num_packet != header.pkt_num:
self.log('Uncorrect Package Number')
self.log('get {0} while it should be {1}'.format(header.pkt_num, num_packet))
return None
if msg_id != header.msg_id:
self.log('Uncorrect Message id')
self.log('get {0} while it should be {1}'.format(header.pkt_num, msg_id))
return None
if packet_id != header.pkt_id:
self.log('Uncorrect pkt id')
self.log('get {0} while it should be {1}'.format(header.pkt_id, packet_id))
return None
message += packet.msg
packet_id += 1
return message
class AuthMessage(SocketMessage):
def __init__(self, token='None'):
super(AuthMessage, self).__init__()
self.token = token
self.type = 'auth'
self.stat = 0
# stat:
# 0: auth client
# 1: auth success
# 2: auth failed.
def encode(self):
self.data = {}
self.data['token'] = self.token
self.data['status'] = self.stat
def decode(self):
self.token = self.data['token']
self.stat = self.data['status']
class SocketConnection():
def __init__(self, daemon, log_prefix=''):
self.sock = None
self.daemon = daemon
self.messages = queue.Queue()
self.terminated = False
self.factory = PacketFactory(log=self.log)
self.header_size = PacketHeader().header_size
self.loglevel = 2
self.log_prefix = log_prefix
# log level:
# 0 : debug
# 1 : message or info
# 2 : warning
# 3 : error
def log(self, *args, level=0, end='\n'):
if level >= self.loglevel:
print(self.log_prefix, end=' ')
for msg in args:
print(str(msg), end=' ')
print(end, end='')
def start(self):
self.jid = self.daemon.add_job(self.recv_bare, name='connection')
def SetSock(self, sock):
self.sock = sock
self.start()
def connect(self, host, port):
if self.sock is None:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((host, port))
self.log('Sock Connected', level=1)
else:
self.log('Socket has already established, ingoring connect.', level=2)
self.start()
def auth(self, token):
if self.sock is None:
self.log('Socket not established, unable to auth.', level=3)
return None
auth_msg = AuthMessage(token)
msg = auth_msg.tobytes()
self.log('Sending Authentication Message.')
self.send(msg)
self.log('Waiting Authentication Status.')
msg = self.recv()
auth_msg.frombytes(msg)
self.log('Status Recived! auth_status =', auth_msg.__dict__)
if auth_msg.stat == 1:
self.log('Auth Success!', level=1)
else:
self.log('Auth Failed!', level=3)
self.close()
def WaitAuth(self, token):
msg = self.recv()
auth_msg = AuthMessage()
auth_msg.frombytes(msg)
if auth_msg.token != token:
self.log('Authentication Failed!', level=1)
auth_msg = AuthMessage('InvalidAuth')
auth_msg.stat = 2
self.send(auth_msg.tobytes())
self.close()
else:
self.log('Authentication Success!', level=1)
auth_msg = AuthMessage('Welcome')
auth_msg.stat = 1
self.send(auth_msg.tobytes())
def send(self, msg):
if self.terminated:
return False
packets = self.factory.to_packets(msg)
self.log('spliting message to {0} packets'.format(len(packets)))
for packet in packets:
self.sock.send(packet.tobytes())
return True
def commit_message(self, packets):
self.log('Generating final packet...')
full_msg = self.factory.from_packets(packets)
if full_msg is not None:
self.log('Valid package!')
self.messages.put(full_msg)
else:
self.log('Invalid package')
def recv_bare(self):
msg_id = None
packets = []
while not self.terminated:
raw_msg = None
try:
raw_msg = self.sock.recv(8192)
except (ConnectionAbortedError, ConnectionResetError):
self.log('Connection Stopped')
self.close()
continue
if raw_msg is None or len(raw_msg) == 0:
self.close()
continue
if len(raw_msg) < self.header_size:
continue
pkt = Packet(PacketHeader())
pkt.frombytes(raw_msg)
packets.append(pkt)
if msg_id is None:
msg_id = pkt.header.msg_id
if msg_id != pkt.header.msg_id:
msg_id = pkt.header.msg_id
packets = [pkt]
continue
if pkt.header.pkt_id == pkt.header.pkt_num - 1:
self.log('Finished')
self.commit_message(packets)
msg_id = None
packets = []
def recv(self):
self.log('Getting messages')
msg = None
while msg is None and not self.terminated:
try:
msg = self.messages.get(timeout=0.1)
except queue.Empty:
msg = None
continue
return msg
self.log('Message Get Finished')
def close(self):
self.terminated = True
self.sock.close()
class HttpServer(SocketServer):
def __init__(self):
super(HttpServer, self).__init__(port=80)
self.header = 'HTTP/1.1 200 OK\nServer: NaiveHttpServer\nConnection: close\nContent-Length: {0}\nContent-Type: text/html\n\n'
def handle_message(self, clientsocket, addr):
# print('Connection Established from clinet', addr)
msg = clientsocket.recv(8192)
text = msg.decode('utf-8')
while '\n' in text:
text = text.replace('\n', '<br>')
info = '<html><body>'
info += '<h1> Hello, World </h1>'
info += '<h2> Your IP Address & Port</h2>\n'
info += '<p>{0}</p>'.format(str(addr))
info += '<h2> Your Request</h2>\n'
info += '<p>{0}</p>\n'.format(text)
info += '</body></html>\n'
length = len(info)
# print('length =', length)
header = self.header.format(length)
msg = header + info
# print('msg:', msg)
clientsocket.send(msg.encode('utf-8'))
# print('message sent!')
clientsocket.close()
class BridgeConnection():
def __init__(self, client, server, num_threads=8):
self.client = client
self.server = server
self.daemon = parallel.ParallelHost(num_threads)
self.exit = False
self.client_terminated = False
self.server_terminated = False
def is_terminated(self):
if self.client_terminated:
return True
else:
return False
def client_recv_handler(self, msg):
# print('send message to server, size=', len(msg))
self.server.send(msg)
def stop(self):
self.daemon.stop('kill')
self.server.close()
print('connection terminated successfully.')
def client_recv(self):
while True:
msg = self.client.recv(8192)
if len(msg) == 0:
self.exit = True
break
# print('message received from clinet, size=', len(msg))
self.daemon.add_job(self.client_recv_handler, args=[msg])
self.client_terminated = True
print('Client Recv terminated.')
def server_recv_handler(self, msg):
# print('send message to client, size=', len(msg))
self.client.send(msg)
def server_recv(self):
while not self.exit:
try:
msg = self.server.recv(8192)
except ConnectionAbortedError:
break
if len(msg) == 0:
self.exit = True
continue
# print('message received from server, size=', len(msg))
self.daemon.add_job(self.server_recv_handler, args=[msg])
self.server_terminated = True
print('Server Recv terminated.')
def run(self):
self.daemon.add_job(self.server_recv)
self.daemon.add_job(self.client_recv)
while not self.is_terminated():
time.sleep(1)
self.stop()
def SendMessage(host, port, msg):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((host, port))
s.send(msg)
print('Message Sent!')
s.close()
def portforward(dst_ip, dst_port, listen_ip, listen_port):
def port_forward_handler(clientsocket, addr):
print('Handling message from clinet', addr)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((dst_ip, dst_port))
print('Connect to dst server success!')
connection = BridgeConnection(clientsocket, s)
connection.run()
print('handler exited')
server = SocketServer(message_handler=port_forward_handler, ip=listen_ip, port=listen_port)
server.start()
# if __name__ == '__main__':
# # let's try a port forwarding server using this socker server.
# portforward('192.168.233.101', 22, '0.0.0.0', 30001)
# portforward('192.168.233.102', 22, '0.0.0.0', 30002)
# con = console.console('PortForward')
# con.interactive()
if __name__ == '__main__':
server = HttpServer()
server.start()
import time
time.sleep(10000)

129
imshow/parallel.py Normal file
View File

@ -0,0 +1,129 @@
import threading
import queue
import time
class Job():
def __init__(self, func, args=[], kwargs={}, name=None):
if name == None:
name = 'job'
self.id = None
self.name = name
self.func = func
self.args = args
self.kwargs = kwargs
self.results = None
def run(self):
self.results = self.func(*self.args, **self.kwargs)
def set_name(self, name):
self.name = name
def set_id(self, jid):
self.id = jid
def __call__(self):
self.run()
class Worker(threading.Thread):
def __init__(self, work_queue, finished_queue):
super(Worker, self).__init__()
self.queue = work_queue
self.finished = finished_queue
self.terminate = False
self.daemon=True
def stop(self):
self.terminate = True
def run(self):
while not self.terminate:
try:
task = self.queue.get(timeout=1)
task.run()
self.queue.task_done()
self.finished.put(task)
except queue.Empty:
pass
except KeyboardInterrupt:
print("you stop the threading")
class ParallelHost():
def __init__(self, num_threads=8):
self.num_threads = num_threads
self.workers = []
self.tasks = queue.Queue()
self.results = queue.Queue()
self.rets = {}
self.id = 0
for i in range(self.num_threads):
worker = Worker(self.tasks, self.results)
self.workers.append(worker)
for worker in self.workers:
worker.start()
def __del__(self):
self.stop('kill')
# soft stop: wait until all job done
# hard stop: stop even with unfinished job
# kill stop: whatever the thread is doing, exit.
def stop(self, mode='soft'):
print('Trying to stop.')
if mode == 'soft':
self.tasks.join()
print('All job finished.')
for worker in self.workers:
worker.stop()
if mode == 'kill':
worker.join(0.01)
def commit(self, job):
self.id += 1
job.set_id(self.id)
self.tasks.put(job)
return self.id
def add_job(self, func, args=[], kwargs={}, name=None):
job = Job(func, args, kwargs, name)
return self.commit(job)
def collect_all(self):
while not self.results.empty():
task = self.results.get()
jid = task.id
self.rets[jid] = task.results
def get_result(self, jid, block=False):
if jid in self.rets:
ret = self.rets[jid]
del self.rets[jid]
return ret
while True:
if self.results.empty() and not block:
break
task = self.results.get()
if task.jid == jid:
return task.results
else:
self.rets[task.jid] = task.results
def clear_results(self):
while not self.results.empty():
self.results.get()
self.rets = {}
daemon = ParallelHost()
if __name__ == '__main__':
host = ParallelHost()
def loop_print(info, num):
for i in range(num):
print(info + ':' + str(i))
time.sleep(1)
for i in range(10):
host.add_job(loop_print, ["loop_print_{0}".format(i), 5])
host.terminate('kill')

72
imshow/server.py Normal file
View File

@ -0,0 +1,72 @@
import socket
import queue
import time
import cv2
from imshow import network
from imshow import wrap
from imshow import config
from imshow import parallel
SocketServer = network.SocketServer
SocketMessage = network.SocketMessage
SocketConnection = network.SocketConnection
class ImageServer():
def __init__(self, ip, port):
self.server = SocketServer(ip=ip, port=port, message_handler=self.process_socket)
self.server.start()
self.id = 0
self.finished_id = []
self.imgs = queue.Queue()
def process_socket(self, clientsocket, addr):
connection = SocketConnection(self.server.daemon, '[{0}]'.format(str(addr)))
connection.log('Connection extablished from {0}'.format(addr))
connection.SetSock(clientsocket)
connection.log('Waiting for authentication...', level=1)
connection.WaitAuth(config.token)
while True:
message = connection.recv()
if message is None:
break
img = wrap.ImageMessage()
img.frombytes(message)
task = {}
task['img'] = img.img
task['name'] = img.name
task['wait'] = img.wait
task['id'] = self.id
task['source'] = addr
self.imgs.put(task)
self.id += 1
connection.log('waiting for showing image')
while task['id'] not in self.finished_id:
time.sleep(0.01)
continue
connection.log('image shown!')
if not connection.send('continue'.encode('utf-8')):
connection.log('Exited')
break
connection.log('client {0} disconnected'.format(addr))
def show(self):
while True:
task = None
try:
task = self.imgs.get(timeout=0.1)
except queue.Empty:
cv2.waitKey(1)
continue
img = task['img']
name = task['name']
wait = task['wait']
cv2.imshow(name, img)
cv2.waitKey(wait)
self.finished_id.append(task['id'])
if __name__ == '__main__':
server = ImageServer('0.0.0.0', 12345)
server.show()

139
imshow/utils.py Normal file
View File

@ -0,0 +1,139 @@
import pickle
import time
import os
import re
import platform
def detect_platform():
p = 'Unknown'
if platform.platform().find('Windows') != -1:
p = 'Windows'
elif platform.platform().find('Linux') != -1:
p = 'Linux'
return p
def ensure_dir_exist(directory, show_info = True):
exist = os.path.isdir(directory)
if not exist:
print('directory', directory, ' not found, creating...')
os.mkdir(directory)
def validateTitle(title):
rstr = r"[\/\\\:\*\?\"\<\>\|]" # '/ \ : * ? " < > |'
new_title = re.sub(rstr, " ", title) # 替换为空格
return new_title
def list2csv(l):
csv = ''
for item in l:
csv += str(item) + ','
csv = csv[:-1]
return csv
def clean_text(string):
if string is None:
return ''
while '\n' in string:
string = string.replace('\n', ' ')
splits = clean_split(string)
string = ''
for split in splits:
string += split + ' '
string = string[:-1]
return string
def clean_split(string, delimiter=' '):
sub_strs = string.split(delimiter)
splits = []
for sub_str in sub_strs:
if sub_str is not '':
splits.append(sub_str)
return splits
def remove_blank_in_endpoint(string):
length = len(string)
first_index = 0
for i in range(length):
if is_blank(string[first_index]):
first_index += 1
else:
break
last_index = length - 1
for i in range(length):
if is_blank(string[last_index]):
last_index -= 1
else:
break
last_index += 1
return string[first_index:last_index]
def is_blank(ch):
blank_ch = [' ', '\t', '\n']
if ch in blank_ch:
return True
else:
return False
def dict_to_arrtibute_string(attributes):
string = ''
for key in attributes:
string += key + '=\"{0}\";'.format(str(attributes[key]))
return string
def attribute_string_to_dict(attrs):
attr_dict = {}
for attr in attrs:
attr_dict[attr[0]] = attr[1]
return attr_dict
def save_python_object(obj, save_path):
with open(save_path, 'wb') as file:
pickle.dump(obj, file)
def load_python_object(path):
with open(path, 'rb') as file:
return pickle.load(file)
def delete_n(string):
while '\n' in string:
string = string.replace('\n', ' ')
return string
def remove_additional_blank(string):
words = string.split(' ')
string = ''
for word in words:
if word is not '':
string += word + ' '
return string[:-1]
def formal_text(text):
text = delete_n(text)
text = remove_additional_blank(text)
return text
def float2str(f, precision=2):
f = str(f)
f_base = f[:f.find('.') + precision]
return f_base
# ========== time realted operation ========== #
def str_day():
day = time.strftime("%Y-%m-%d", time.localtime())
return day
def time2str(t):
localtime = time.localtime(int(t))
return str_time(localtime)
def str_time(local_time = None):
if local_time is None:
local_time = time.localtime()
day = time.strftime("%Y-%m-%d-%Hh-%Mm-%Ss)", local_time)
return day
if __name__ == '__main__':
print(str_day())

34
imshow/wrap.py Normal file
View File

@ -0,0 +1,34 @@
import numpy as np
import base64
import json
import zlib
from imshow import network
SocketServer = network.SocketServer
SocketMessage = network.SocketMessage
class ImageMessage(SocketMessage):
def __init__(self, img=None, name = 'Image', wait=0):
super(ImageMessage, self).__init__()
self.img = img
self.wait = wait
self.name = name
self.type = 'image'
def encode(self):
img_bytes = self.img.tobytes()
img_bytes = base64.b64encode(img_bytes).decode('utf-8')
img_shape = self.img.shape
self.data['img'] = img_bytes
self.data['dtype'] = self.img.dtype.char
self.data['shape'] = img_shape
self.data['wait'] = self.wait
self.data['name'] = self.name
def decode(self):
byte = base64.b64decode(self.data['img'])
img = np.frombuffer(byte, dtype=np.dtype(self.data['dtype']))
img = img.reshape(self.data['shape'])
self.img = img
self.wait = self.data['wait']
self.name = str(self.data['name'])

4
server.py Normal file
View File

@ -0,0 +1,4 @@
from imshow.server import ImageServer
if __name__ == '__main__':
ImageServer('0.0.0.0', 12345).show()