Package artistlib

The aRTist Python library is intended to remote control and automate the radiographic simulator aRTist.

Hint: This project is in its early stages and may change significantly over time.

Getting started

Requirements

A Python 3 environment (Python 3.8 or higher) is required. The following Python packages are required as well. They usually come with a Python environment, or can be installed there easily:

  • none so far.

Installation

If you want to install the package in your Python environment, you can use pip. For example, you can run the following command to make the toolbox available:

pip install artistlib

To use the package without installation, you need to download the package manually. You have the following three options:

  • Download the package from PyPi. You will get a zipped file called artistlib-X.X.X.tar.gz (where X.X.X is the current version number). Open or unpack this file to get to its contents.
  • Download the repository from GitHub: press the clone button to download a ZIP file. Unpack it or open it to see its contents.
  • You can also clone the repository from GitHub via the terminal:

    git clone https://github.com/BAMresearch/aRTist-PythonLib.git

From any of these three options, you will get the complete package source code. It contains a folder called artistlib. If you want to use the library from your Python scripts without installing it, you need to copy the folder artistlib to the location of your Python scripts to make the package available.

Usage

Please make sure that aRTist is running and waiting for remote connection (menu: Tools>Enable remote access).

The following example code shows the basic usage:

import artistlib as a

# connect to aRTist (default: host='localhost', port=3658) 
rc = a.Junction()

# print aRTist's version number
ver = a.Junction.send(rc, '::aRTist::GetVersion full')
print(ver)

# - load a project
# - run the simulation
# - save the resulting projection from the image viewer
a.Junction.send(rc, 'FileIO::OpenAny $Xray(LibraryDir)/ExampleProjects/Schlumpfmark.aRTist')
a.Junction.send(rc, 'Engine::GoCmd')
ans = "Saved as: "
ans += a.Junction.send(rc, 'Modules::Invoke ImageViewer SaveFile [file join $env(HOME) Pictures/artistlib.tif] true')
print(ans)

# - load a project
# - run the simulation without viewing the result
# - transfer the final projection 
# - saving the projection image  >>>> This requires aRTist 2.12.7 or higher! <<<<
# - delete the images to release the memory
a.Junction.send(rc, 'FileIO::OpenAny $Xray(LibraryDir)/ExampleProjects/aRTist.aRTist')
a.Junction.send(rc, 'set imgList [Engine::Go]')
a.Junction.send(rc, 'RemoteControl::SendImage [lindex $imgList 0]')
a.Junction.save_image(rc, "transferred.tif")
a.Junction.send(rc, 'foreach i $imgList {$i Delete}')
print("Saved as: transferred.tif")

About

The aRTist library was developed for utilize the Python scripting of the radiographic simulator aRTist.

The software is released under the Apache 2.0 license, its source code is available on GitHub.

Contributors

The following people contributed to code and documentation of the package:

  • Alexander Funke
  • David Schumacher
  • Carsten Bellon
  • David Plotzki
  • David Denkler
Expand source code
# -*- coding: UTF-8 -*-
"""The aRTist Python library is intended to remote control and automate the radiographic simulator aRTist.

.. include:: ./documentation.md
"""

__pdoc__ = {'console': False, 'remote_access': False}

from ._version import get_versions
__version__ = get_versions()['version']
del get_versions

import numpy as np
from numpy import append
import socket
import base64
import pathlib
from PIL import Image
class Junction:
    """Remote control of aRTist simulator (this is a test)
    """
    def __init__(self, host="localhost", port=3658, bufferSize=1024, timeout=5):
        self.host = host
        self.port = port
        self.bufferSize = bufferSize
        self.timeout = timeout
        self.error = 0
        self.progress = 0
        self.answer = {}

        self.connect()

    def connect(self):
        self.S = socket.socket()                        # Create socket (for TCP)
        self.S.connect((self.host, self.port))          # Connect to aRTist
        self.S.settimeout(self.timeout)
        self.listen(0)
        return self

    def send(self, command, msgType="RESULT"):
        c = command + '\n'
        self.S.send(c.encode())
        return self.listen(msgType=msgType)

    def listen(self, command_no=1, msgType="RESULT"):
        answer = ""
        stop = False
        if (command_no == 0):
            self.S.settimeout(0.2)
        while (not stop):# and ("SUCCESS" not in total) and ("ERROR" not in total):     # Solange server antwortet und nicht "SUCCESS" enthält
            try:
                msg = self.S.recv(self.bufferSize).decode()
            except BaseException as e:
                err = e.args[0]
                if err == "timed out":
                    #print("Timeout\n")
                    answer += "RESULT Timeout\n"
                    #print(answer)
                    stop = True
                    continue
            else:
                if ("SUCCESS" in msg):
                    answer += msg
                    stop = True
                    continue
                elif ("ERROR" in msg):
                    answer += msg
                    stop = True
                    #global error
                    self.error = self.error + 1 
                    continue
                elif ("PROGRESS" in msg):
                    try:
                        self.progress = float(msg.strip('PROGRESS '))
                    except:
                        self.progress = 0
                    continue
                else:
                    if (command_no == 0):
                        print(msg)
                    answer += msg
        self.S.settimeout(self.timeout)
        self.answer.update({"SUCCESS":self.pick(answer, "SUCCESS"), "RESULT":self.pick(answer, "RESULT"), "SDTOUT":self.pick(answer, "STDOUT"), "BASE64":self.pick(answer, "BASE64"), "IMAGE":self.pick(answer, "IMAGE"), "FILE":self.pick(answer, "FILE")})
        if (msgType != "*"):
            answer = self.pick(answer, msgType)
        return answer

    def pick(self, answer, res='RESULT'):
        picked = ''
        for a in answer.split('\n'):
            if a.find(res) == 0:
                picked += a[1 + len(res):].strip('\r') + '\n'
        if len(picked) == 0:
            return res + ' not found.'
        return picked

    def get_answer(self, key):
        return self.answer[key]

    def save_image(self, imageName):
        npTypes = [np.bool_, np.ubyte, np.byte, np.ubyte, np.short, np.ushort, np.intc, np.uintc, np.int_, np.uint, np.single, np.double]
        imageData = self.answer["BASE64"]
        decodedData = base64.b64decode((imageData))
        imageHeader = self.answer["IMAGE"].split()
        dtype = npTypes[int(imageHeader[4])]
        im = np.frombuffer(decodedData, dtype).reshape((int(imageHeader[1]),int(imageHeader[0])))
        Image.fromarray(im).save(imageName)

    def receive_file(self, fileName):
        fileData = self.answer["BASE64"]
        decodedFile = base64.b64decode((fileData))
        artistFile = open(fileName, "wb")
        artistFile.write(decodedFile)
        artistFile.close()

    def send_file(self, fileName):
        outFile = open(fileName, "br")
        fileBytes = outFile.read()
        encBytes = base64.b64encode((fileBytes))
        encString = str(encBytes)
        encString = encString.lstrip("b'").rstrip("'")
        fileExtension = pathlib.Path(fileName).suffix
        com = "::RemoteControl::ReceiveFile " + encString + " " + fileExtension
        recAnswer = self.send(com, "RESULT")
        return recAnswer

Classes

class Junction (host='localhost', port=3658, bufferSize=1024, timeout=5)

Remote control of aRTist simulator (this is a test)

Expand source code
class Junction:
    """Remote control of aRTist simulator (this is a test)
    """
    def __init__(self, host="localhost", port=3658, bufferSize=1024, timeout=5):
        self.host = host
        self.port = port
        self.bufferSize = bufferSize
        self.timeout = timeout
        self.error = 0
        self.progress = 0
        self.answer = {}

        self.connect()

    def connect(self):
        self.S = socket.socket()                        # Create socket (for TCP)
        self.S.connect((self.host, self.port))          # Connect to aRTist
        self.S.settimeout(self.timeout)
        self.listen(0)
        return self

    def send(self, command, msgType="RESULT"):
        c = command + '\n'
        self.S.send(c.encode())
        return self.listen(msgType=msgType)

    def listen(self, command_no=1, msgType="RESULT"):
        answer = ""
        stop = False
        if (command_no == 0):
            self.S.settimeout(0.2)
        while (not stop):# and ("SUCCESS" not in total) and ("ERROR" not in total):     # Solange server antwortet und nicht "SUCCESS" enthält
            try:
                msg = self.S.recv(self.bufferSize).decode()
            except BaseException as e:
                err = e.args[0]
                if err == "timed out":
                    #print("Timeout\n")
                    answer += "RESULT Timeout\n"
                    #print(answer)
                    stop = True
                    continue
            else:
                if ("SUCCESS" in msg):
                    answer += msg
                    stop = True
                    continue
                elif ("ERROR" in msg):
                    answer += msg
                    stop = True
                    #global error
                    self.error = self.error + 1 
                    continue
                elif ("PROGRESS" in msg):
                    try:
                        self.progress = float(msg.strip('PROGRESS '))
                    except:
                        self.progress = 0
                    continue
                else:
                    if (command_no == 0):
                        print(msg)
                    answer += msg
        self.S.settimeout(self.timeout)
        self.answer.update({"SUCCESS":self.pick(answer, "SUCCESS"), "RESULT":self.pick(answer, "RESULT"), "SDTOUT":self.pick(answer, "STDOUT"), "BASE64":self.pick(answer, "BASE64"), "IMAGE":self.pick(answer, "IMAGE"), "FILE":self.pick(answer, "FILE")})
        if (msgType != "*"):
            answer = self.pick(answer, msgType)
        return answer

    def pick(self, answer, res='RESULT'):
        picked = ''
        for a in answer.split('\n'):
            if a.find(res) == 0:
                picked += a[1 + len(res):].strip('\r') + '\n'
        if len(picked) == 0:
            return res + ' not found.'
        return picked

    def get_answer(self, key):
        return self.answer[key]

    def save_image(self, imageName):
        npTypes = [np.bool_, np.ubyte, np.byte, np.ubyte, np.short, np.ushort, np.intc, np.uintc, np.int_, np.uint, np.single, np.double]
        imageData = self.answer["BASE64"]
        decodedData = base64.b64decode((imageData))
        imageHeader = self.answer["IMAGE"].split()
        dtype = npTypes[int(imageHeader[4])]
        im = np.frombuffer(decodedData, dtype).reshape((int(imageHeader[1]),int(imageHeader[0])))
        Image.fromarray(im).save(imageName)

    def receive_file(self, fileName):
        fileData = self.answer["BASE64"]
        decodedFile = base64.b64decode((fileData))
        artistFile = open(fileName, "wb")
        artistFile.write(decodedFile)
        artistFile.close()

    def send_file(self, fileName):
        outFile = open(fileName, "br")
        fileBytes = outFile.read()
        encBytes = base64.b64encode((fileBytes))
        encString = str(encBytes)
        encString = encString.lstrip("b'").rstrip("'")
        fileExtension = pathlib.Path(fileName).suffix
        com = "::RemoteControl::ReceiveFile " + encString + " " + fileExtension
        recAnswer = self.send(com, "RESULT")
        return recAnswer

Methods

def connect(self)
Expand source code
def connect(self):
    self.S = socket.socket()                        # Create socket (for TCP)
    self.S.connect((self.host, self.port))          # Connect to aRTist
    self.S.settimeout(self.timeout)
    self.listen(0)
    return self
def get_answer(self, key)
Expand source code
def get_answer(self, key):
    return self.answer[key]
def listen(self, command_no=1, msgType='RESULT')
Expand source code
def listen(self, command_no=1, msgType="RESULT"):
    answer = ""
    stop = False
    if (command_no == 0):
        self.S.settimeout(0.2)
    while (not stop):# and ("SUCCESS" not in total) and ("ERROR" not in total):     # Solange server antwortet und nicht "SUCCESS" enthält
        try:
            msg = self.S.recv(self.bufferSize).decode()
        except BaseException as e:
            err = e.args[0]
            if err == "timed out":
                #print("Timeout\n")
                answer += "RESULT Timeout\n"
                #print(answer)
                stop = True
                continue
        else:
            if ("SUCCESS" in msg):
                answer += msg
                stop = True
                continue
            elif ("ERROR" in msg):
                answer += msg
                stop = True
                #global error
                self.error = self.error + 1 
                continue
            elif ("PROGRESS" in msg):
                try:
                    self.progress = float(msg.strip('PROGRESS '))
                except:
                    self.progress = 0
                continue
            else:
                if (command_no == 0):
                    print(msg)
                answer += msg
    self.S.settimeout(self.timeout)
    self.answer.update({"SUCCESS":self.pick(answer, "SUCCESS"), "RESULT":self.pick(answer, "RESULT"), "SDTOUT":self.pick(answer, "STDOUT"), "BASE64":self.pick(answer, "BASE64"), "IMAGE":self.pick(answer, "IMAGE"), "FILE":self.pick(answer, "FILE")})
    if (msgType != "*"):
        answer = self.pick(answer, msgType)
    return answer
def pick(self, answer, res='RESULT')
Expand source code
def pick(self, answer, res='RESULT'):
    picked = ''
    for a in answer.split('\n'):
        if a.find(res) == 0:
            picked += a[1 + len(res):].strip('\r') + '\n'
    if len(picked) == 0:
        return res + ' not found.'
    return picked
def receive_file(self, fileName)
Expand source code
def receive_file(self, fileName):
    fileData = self.answer["BASE64"]
    decodedFile = base64.b64decode((fileData))
    artistFile = open(fileName, "wb")
    artistFile.write(decodedFile)
    artistFile.close()
def save_image(self, imageName)
Expand source code
def save_image(self, imageName):
    npTypes = [np.bool_, np.ubyte, np.byte, np.ubyte, np.short, np.ushort, np.intc, np.uintc, np.int_, np.uint, np.single, np.double]
    imageData = self.answer["BASE64"]
    decodedData = base64.b64decode((imageData))
    imageHeader = self.answer["IMAGE"].split()
    dtype = npTypes[int(imageHeader[4])]
    im = np.frombuffer(decodedData, dtype).reshape((int(imageHeader[1]),int(imageHeader[0])))
    Image.fromarray(im).save(imageName)
def send(self, command, msgType='RESULT')
Expand source code
def send(self, command, msgType="RESULT"):
    c = command + '\n'
    self.S.send(c.encode())
    return self.listen(msgType=msgType)
def send_file(self, fileName)
Expand source code
def send_file(self, fileName):
    outFile = open(fileName, "br")
    fileBytes = outFile.read()
    encBytes = base64.b64encode((fileBytes))
    encString = str(encBytes)
    encString = encString.lstrip("b'").rstrip("'")
    fileExtension = pathlib.Path(fileName).suffix
    com = "::RemoteControl::ReceiveFile " + encString + " " + fileExtension
    recAnswer = self.send(com, "RESULT")
    return recAnswer