class
tcp_client():
def
__init__
(self, tcp_port):
self.s
=
socket.socket(socket.AF_INET,socket.SOCK_STREAM)
self.s.connect((
'
127.0.0.1
'
,tcp_port))
print
( self.s.recv(1024
).decode())
data
= {
'
command
'
:
'
init
'
}
str_json
=
json.dumps(data)
self.s.send( str_json.encode() )
str_recv
= self.s.recv(1024
).decode()
#
print(str_recv)
data_json =
json.loads( str_recv )
def
step(self):
data
={
'
command
'
:
"
take_picture
"
,
'
parameters
'
:{
'
position
'
:
"
1
"
}}
#
print(data2['parameters']['position'])
str_json =
json.dumps(data)
self.s.send( str_json.encode() )
str_recv
= self.s.recv(1024
).decode()
data_json
=
json.loads( str_recv )
print
(
"
获得拍照结果
"
)
photo_result
= data_json[
'
return
'
][
'
result
'
]
return
photo_result
def
get_result(self):
data
={
'
command
'
:
"
get_result
"
}
str_json
=
json.dumps(data)
self.s.send(str_json.encode())
str_recv
= self.s.recv(1024
).decode()
data_json
=
json.loads( str_recv )
res
= data_json[
'
return
'
][
'
result
'
]
print
(res)
#
发送关闭客户端命令
def
close_tcp(self):
print
(
"
close tcp ...
"
)
data
= {
'
command
'
:
'
close
'
}
str_json
=
json.dumps(data)
self.s.send( str_json.encode() )
self.s.close()
client
= tcp_client( tcp_port =
TCP_PORT )
#
测试步骤
client.step()
client.get_result()
client.close_tcp()
服务端代码如下:
# python 3
# 服务端
import numpy as np
import socket
import threading
import json
import logging
TCP_PORT = 12005
def one_servicer():
#Create The Socket
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
#Listen The Port
s.bind(('',TCP_PORT))
s.listen(5)
print('Waiting for connection...')
def tcplink(sock,addr):
print('Accept new connection from %s:%s...' % addr)
sock.send( ('connect success!').encode() )
while True:
data=sock.recv(1024).decode()
print("data: ", data)
data_json = json.loads( data )
#print('data_json["type"]: ', data_json["type"])
print('data_json["command"]: ', data_json["command"])
#if data_json["type"] == "step":
#data = { 'state' : "123", 'reward' : "456", 'done' : "789", 'info' : "ok" }
if data_json["command"] == "get_result":
#返回图片检测结果是ok或者ng
data = {'command':"get_result",'return':{'result':"ok"}}
elif data_json["command"] == "take_picture":
if data_json['parameters']['position']== "1":
#执行相机拍照代码
print("第一次拍照完成")
data = {'command':"take_picture",'return':{'result':"ok"}}
if data_json['parameters']['position']== "2":
#执行相机拍照代码
print("第二次拍照完成")
data = {'command':"take_picture",'return':{'result':"ok"}}
if data_json['parameters']['position']== "3":
#执行相机拍照代码
print("第三次拍照完成")
data = {'command':"take_picture",'return':{'result':"ok"}}
if data_json['parameters']['position']== "4":
#执行相机拍照代码
print("第四次拍照完成")
data = {'command':"take_picture",'return':{'result':"ok"}}
if data_json['parameters']['position']== "5":
#执行相机拍照代码
print("第五次拍照完成")
data = {'command':"take_picture",'return':{'result':"ok"}}
if data_json['parameters']['position']== "6":
#执行相机拍照代码
print("第六次拍照完成")
data = {'command':"take_picture",'return':{'result':"ok"}}
elif data_json["command"] == "close":
break
str_json = json.dumps(data)
sock.send( str_json.encode() )
sock.close()
print('Connection from %s:%s closed.'%addr)
while True:
# 开始一个新连接
sock,addr=s.accept()
# 创建一个线程来处理连接
t=threading.Thread(target=tcplink(sock,addr))
if __name__ == "__main__":
one_servicer()