Package artistlib
The aRTist Python library is intended to remote control and automate the radiographic simulator aRTist.
Hint: This project is in its early stages and may change significantly over time.
Getting started
Requirements
A Python 3 environment (Python 3.8 or higher) is required. The following Python packages are required as well. They usually come with a Python environment, or can be installed there easily:
- none so far.
Installation
If you want to install the package in your Python environment, you can use pip. For example, you can run the following command to make the toolbox available:
pip install artistlib
To use the package without installation, you need to download the package manually. You have the following three options:
- Download the package from PyPi. You will get a zipped file called
artistlib-X.X.X.tar.gz
(where X.X.X is the current version number). Open or unpack this file to get to its contents. - Download the repository from GitHub: press the clone button to download a ZIP file. Unpack it or open it to see its contents.
-
You can also clone the repository from GitHub via the terminal:
git clone https://github.com/BAMresearch/aRTist-PythonLib.git
From any of these three options, you will get the complete package source code. It contains a folder called artistlib
. If you want to use the library from your Python scripts without installing it, you need to copy the folder artistlib
to the location of your Python scripts to make the package available.
Usage
Please make sure that aRTist is running and waiting for remote connection (menu: Tools>Enable remote access).
The following example code shows the basic usage:
import artistlib as a
# connect to aRTist (default: host='localhost', port=3658)
rc = a.Junction()
# print aRTist's version number
ver = a.Junction.send(rc, '::aRTist::GetVersion full')
print(ver)
# - load a project
# - run the simulation
# - save the resulting projection from the image viewer
a.Junction.send(rc, 'FileIO::OpenAny $Xray(LibraryDir)/ExampleProjects/Schlumpfmark.aRTist')
a.Junction.send(rc, 'Engine::GoCmd')
ans = "Saved as: "
ans += a.Junction.send(rc, 'Modules::Invoke ImageViewer SaveFile [file join $env(HOME) Pictures/artistlib.tif] true')
print(ans)
# - load a project
# - run the simulation without viewing the result
# - transfer the final projection
# - saving the projection image >>>> This requires aRTist 2.12.7 or higher! <<<<
# - delete the images to release the memory
a.Junction.send(rc, 'FileIO::OpenAny $Xray(LibraryDir)/ExampleProjects/aRTist.aRTist')
a.Junction.send(rc, 'set imgList [Engine::Go]')
a.Junction.send(rc, 'RemoteControl::SendImage [lindex $imgList 0]')
a.Junction.save_image(rc, "transferred.tif")
a.Junction.send(rc, 'foreach i $imgList {$i Delete}')
print("Saved as: transferred.tif")
About
The aRTist library was developed for utilize the Python scripting of the radiographic simulator aRTist.
The software is released under the Apache 2.0 license, its source code is available on GitHub.
Contributors
The following people contributed to code and documentation of the package:
- Alexander Funke
- David Schumacher
- Carsten Bellon
- David Plotzki
- David Denkler
Expand source code
# -*- coding: UTF-8 -*-
"""The aRTist Python library is intended to remote control and automate the radiographic simulator aRTist.
.. include:: ./documentation.md
"""
__pdoc__ = {'console': False, 'remote_access': False}
from ._version import get_versions
__version__ = get_versions()['version']
del get_versions
import numpy as np
from numpy import append
import socket
import base64
import pathlib
from PIL import Image
class Junction:
"""Remote control of aRTist simulator (this is a test)
"""
def __init__(self, host="localhost", port=3658, bufferSize=1024, timeout=5):
self.host = host
self.port = port
self.bufferSize = bufferSize
self.timeout = timeout
self.error = 0
self.progress = 0
self.answer = {}
self.connect()
def connect(self):
self.S = socket.socket() # Create socket (for TCP)
self.S.connect((self.host, self.port)) # Connect to aRTist
self.S.settimeout(self.timeout)
self.listen(0)
return self
def send(self, command, msgType="RESULT"):
c = command + '\n'
self.S.send(c.encode())
return self.listen(msgType=msgType)
def listen(self, command_no=1, msgType="RESULT"):
answer = ""
stop = False
if (command_no == 0):
self.S.settimeout(0.2)
while (not stop):# and ("SUCCESS" not in total) and ("ERROR" not in total): # Solange server antwortet und nicht "SUCCESS" enthält
try:
msg = self.S.recv(self.bufferSize).decode()
except BaseException as e:
err = e.args[0]
if err == "timed out":
#print("Timeout\n")
answer += "RESULT Timeout\n"
#print(answer)
stop = True
continue
else:
if ("SUCCESS" in msg):
answer += msg
stop = True
continue
elif ("ERROR" in msg):
answer += msg
stop = True
#global error
self.error = self.error + 1
continue
elif ("PROGRESS" in msg):
try:
self.progress = float(msg.strip('PROGRESS '))
except:
self.progress = 0
continue
else:
if (command_no == 0):
print(msg)
answer += msg
self.S.settimeout(self.timeout)
self.answer.update({"SUCCESS":self.pick(answer, "SUCCESS"), "RESULT":self.pick(answer, "RESULT"), "SDTOUT":self.pick(answer, "STDOUT"), "BASE64":self.pick(answer, "BASE64"), "IMAGE":self.pick(answer, "IMAGE"), "FILE":self.pick(answer, "FILE")})
if (msgType != "*"):
answer = self.pick(answer, msgType)
return answer
def pick(self, answer, res='RESULT'):
picked = ''
for a in answer.split('\n'):
if a.find(res) == 0:
picked += a[1 + len(res):].strip('\r') + '\n'
if len(picked) == 0:
return res + ' not found.'
return picked
def get_answer(self, key):
return self.answer[key]
def save_image(self, imageName):
npTypes = [np.bool_, np.ubyte, np.byte, np.ubyte, np.short, np.ushort, np.intc, np.uintc, np.int_, np.uint, np.single, np.double]
imageData = self.answer["BASE64"]
decodedData = base64.b64decode((imageData))
imageHeader = self.answer["IMAGE"].split()
dtype = npTypes[int(imageHeader[4])]
im = np.frombuffer(decodedData, dtype).reshape((int(imageHeader[1]),int(imageHeader[0])))
Image.fromarray(im).save(imageName)
def receive_file(self, fileName):
fileData = self.answer["BASE64"]
decodedFile = base64.b64decode((fileData))
artistFile = open(fileName, "wb")
artistFile.write(decodedFile)
artistFile.close()
def send_file(self, fileName):
outFile = open(fileName, "br")
fileBytes = outFile.read()
encBytes = base64.b64encode((fileBytes))
encString = str(encBytes)
encString = encString.lstrip("b'").rstrip("'")
fileExtension = pathlib.Path(fileName).suffix
com = "::RemoteControl::ReceiveFile " + encString + " " + fileExtension
recAnswer = self.send(com, "RESULT")
return recAnswer
Classes
class Junction (host='localhost', port=3658, bufferSize=1024, timeout=5)
-
Remote control of aRTist simulator (this is a test)
Expand source code
class Junction: """Remote control of aRTist simulator (this is a test) """ def __init__(self, host="localhost", port=3658, bufferSize=1024, timeout=5): self.host = host self.port = port self.bufferSize = bufferSize self.timeout = timeout self.error = 0 self.progress = 0 self.answer = {} self.connect() def connect(self): self.S = socket.socket() # Create socket (for TCP) self.S.connect((self.host, self.port)) # Connect to aRTist self.S.settimeout(self.timeout) self.listen(0) return self def send(self, command, msgType="RESULT"): c = command + '\n' self.S.send(c.encode()) return self.listen(msgType=msgType) def listen(self, command_no=1, msgType="RESULT"): answer = "" stop = False if (command_no == 0): self.S.settimeout(0.2) while (not stop):# and ("SUCCESS" not in total) and ("ERROR" not in total): # Solange server antwortet und nicht "SUCCESS" enthält try: msg = self.S.recv(self.bufferSize).decode() except BaseException as e: err = e.args[0] if err == "timed out": #print("Timeout\n") answer += "RESULT Timeout\n" #print(answer) stop = True continue else: if ("SUCCESS" in msg): answer += msg stop = True continue elif ("ERROR" in msg): answer += msg stop = True #global error self.error = self.error + 1 continue elif ("PROGRESS" in msg): try: self.progress = float(msg.strip('PROGRESS ')) except: self.progress = 0 continue else: if (command_no == 0): print(msg) answer += msg self.S.settimeout(self.timeout) self.answer.update({"SUCCESS":self.pick(answer, "SUCCESS"), "RESULT":self.pick(answer, "RESULT"), "SDTOUT":self.pick(answer, "STDOUT"), "BASE64":self.pick(answer, "BASE64"), "IMAGE":self.pick(answer, "IMAGE"), "FILE":self.pick(answer, "FILE")}) if (msgType != "*"): answer = self.pick(answer, msgType) return answer def pick(self, answer, res='RESULT'): picked = '' for a in answer.split('\n'): if a.find(res) == 0: picked += a[1 + len(res):].strip('\r') + '\n' if len(picked) == 0: return res + ' not found.' return picked def get_answer(self, key): return self.answer[key] def save_image(self, imageName): npTypes = [np.bool_, np.ubyte, np.byte, np.ubyte, np.short, np.ushort, np.intc, np.uintc, np.int_, np.uint, np.single, np.double] imageData = self.answer["BASE64"] decodedData = base64.b64decode((imageData)) imageHeader = self.answer["IMAGE"].split() dtype = npTypes[int(imageHeader[4])] im = np.frombuffer(decodedData, dtype).reshape((int(imageHeader[1]),int(imageHeader[0]))) Image.fromarray(im).save(imageName) def receive_file(self, fileName): fileData = self.answer["BASE64"] decodedFile = base64.b64decode((fileData)) artistFile = open(fileName, "wb") artistFile.write(decodedFile) artistFile.close() def send_file(self, fileName): outFile = open(fileName, "br") fileBytes = outFile.read() encBytes = base64.b64encode((fileBytes)) encString = str(encBytes) encString = encString.lstrip("b'").rstrip("'") fileExtension = pathlib.Path(fileName).suffix com = "::RemoteControl::ReceiveFile " + encString + " " + fileExtension recAnswer = self.send(com, "RESULT") return recAnswer
Methods
def connect(self)
-
Expand source code
def connect(self): self.S = socket.socket() # Create socket (for TCP) self.S.connect((self.host, self.port)) # Connect to aRTist self.S.settimeout(self.timeout) self.listen(0) return self
def get_answer(self, key)
-
Expand source code
def get_answer(self, key): return self.answer[key]
def listen(self, command_no=1, msgType='RESULT')
-
Expand source code
def listen(self, command_no=1, msgType="RESULT"): answer = "" stop = False if (command_no == 0): self.S.settimeout(0.2) while (not stop):# and ("SUCCESS" not in total) and ("ERROR" not in total): # Solange server antwortet und nicht "SUCCESS" enthält try: msg = self.S.recv(self.bufferSize).decode() except BaseException as e: err = e.args[0] if err == "timed out": #print("Timeout\n") answer += "RESULT Timeout\n" #print(answer) stop = True continue else: if ("SUCCESS" in msg): answer += msg stop = True continue elif ("ERROR" in msg): answer += msg stop = True #global error self.error = self.error + 1 continue elif ("PROGRESS" in msg): try: self.progress = float(msg.strip('PROGRESS ')) except: self.progress = 0 continue else: if (command_no == 0): print(msg) answer += msg self.S.settimeout(self.timeout) self.answer.update({"SUCCESS":self.pick(answer, "SUCCESS"), "RESULT":self.pick(answer, "RESULT"), "SDTOUT":self.pick(answer, "STDOUT"), "BASE64":self.pick(answer, "BASE64"), "IMAGE":self.pick(answer, "IMAGE"), "FILE":self.pick(answer, "FILE")}) if (msgType != "*"): answer = self.pick(answer, msgType) return answer
def pick(self, answer, res='RESULT')
-
Expand source code
def pick(self, answer, res='RESULT'): picked = '' for a in answer.split('\n'): if a.find(res) == 0: picked += a[1 + len(res):].strip('\r') + '\n' if len(picked) == 0: return res + ' not found.' return picked
def receive_file(self, fileName)
-
Expand source code
def receive_file(self, fileName): fileData = self.answer["BASE64"] decodedFile = base64.b64decode((fileData)) artistFile = open(fileName, "wb") artistFile.write(decodedFile) artistFile.close()
def save_image(self, imageName)
-
Expand source code
def save_image(self, imageName): npTypes = [np.bool_, np.ubyte, np.byte, np.ubyte, np.short, np.ushort, np.intc, np.uintc, np.int_, np.uint, np.single, np.double] imageData = self.answer["BASE64"] decodedData = base64.b64decode((imageData)) imageHeader = self.answer["IMAGE"].split() dtype = npTypes[int(imageHeader[4])] im = np.frombuffer(decodedData, dtype).reshape((int(imageHeader[1]),int(imageHeader[0]))) Image.fromarray(im).save(imageName)
def send(self, command, msgType='RESULT')
-
Expand source code
def send(self, command, msgType="RESULT"): c = command + '\n' self.S.send(c.encode()) return self.listen(msgType=msgType)
def send_file(self, fileName)
-
Expand source code
def send_file(self, fileName): outFile = open(fileName, "br") fileBytes = outFile.read() encBytes = base64.b64encode((fileBytes)) encString = str(encBytes) encString = encString.lstrip("b'").rstrip("'") fileExtension = pathlib.Path(fileName).suffix com = "::RemoteControl::ReceiveFile " + encString + " " + fileExtension recAnswer = self.send(com, "RESULT") return recAnswer