相关文章推荐
class tcp_client(): def __init__ (self, tcp_port): self.s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) self.s.connect(( ' 127.0.0.1 ' ,tcp_port)) print ( self.s.recv(1024 ).decode()) data = { ' command ' : ' init ' } str_json = json.dumps(data) self.s.send( str_json.encode() ) str_recv = self.s.recv(1024 ).decode() # print(str_recv) data_json = json.loads( str_recv ) def step(self): data ={ ' command ' : " take_picture " , ' parameters ' :{ ' position ' : " 1 " }} # print(data2['parameters']['position']) str_json = json.dumps(data) self.s.send( str_json.encode() ) str_recv = self.s.recv(1024 ).decode() data_json = json.loads( str_recv ) print ( " 获得拍照结果 " ) photo_result = data_json[ ' return ' ][ ' result ' ] return photo_result def get_result(self): data ={ ' command ' : " get_result " } str_json = json.dumps(data) self.s.send(str_json.encode()) str_recv = self.s.recv(1024 ).decode() data_json = json.loads( str_recv ) res = data_json[ ' return ' ][ ' result ' ] print (res) # 发送关闭客户端命令 def close_tcp(self): print ( " close tcp ... " ) data = { ' command ' : ' close ' } str_json = json.dumps(data) self.s.send( str_json.encode() ) self.s.close() client = tcp_client( tcp_port = TCP_PORT ) # 测试步骤 client.step() client.get_result() client.close_tcp()

服务端代码如下:

# python 3
# 服务端
import numpy as np
import socket
import threading
import json
import logging
TCP_PORT = 12005
def one_servicer():
    #Create The Socket
    s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    #Listen The Port
    s.bind(('',TCP_PORT))
    s.listen(5)
    print('Waiting for connection...')
    def tcplink(sock,addr):
        print('Accept new connection from %s:%s...' % addr)
        sock.send( ('connect success!').encode() )
        while True:
            data=sock.recv(1024).decode()
            print("data: ", data)
            data_json = json.loads( data )
            #print('data_json["type"]: ', data_json["type"])
            print('data_json["command"]: ', data_json["command"])
            #if data_json["type"] == "step":
                #data = { 'state' : "123", 'reward' : "456", 'done' : "789", 'info' : "ok" } 
            if data_json["command"] == "get_result":
                #返回图片检测结果是ok或者ng
                data = {'command':"get_result",'return':{'result':"ok"}}
            elif data_json["command"] == "take_picture":
                if data_json['parameters']['position']== "1":
                    #执行相机拍照代码
                    print("第一次拍照完成")
                    data = {'command':"take_picture",'return':{'result':"ok"}} 
                if data_json['parameters']['position']== "2":
                    #执行相机拍照代码
                    print("第二次拍照完成")
                    data = {'command':"take_picture",'return':{'result':"ok"}} 
                if data_json['parameters']['position']== "3":
                    #执行相机拍照代码
                    print("第三次拍照完成")
                    data = {'command':"take_picture",'return':{'result':"ok"}} 
                if data_json['parameters']['position']== "4":
                    #执行相机拍照代码
                    print("第四次拍照完成")
                    data = {'command':"take_picture",'return':{'result':"ok"}}
                if data_json['parameters']['position']== "5":
                    #执行相机拍照代码
                    print("第五次拍照完成")
                    data = {'command':"take_picture",'return':{'result':"ok"}}
                if data_json['parameters']['position']== "6":
                    #执行相机拍照代码
                    print("第六次拍照完成")
                    data = {'command':"take_picture",'return':{'result':"ok"}}
            elif data_json["command"] == "close":
                break
            str_json = json.dumps(data)
            sock.send( str_json.encode() )
        sock.close()
        print('Connection from %s:%s closed.'%addr)
    while True:
        # 开始一个新连接
        sock,addr=s.accept()
        # 创建一个线程来处理连接
        t=threading.Thread(target=tcplink(sock,addr))
if __name__ == "__main__":
        one_servicer()
 
推荐文章