Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 32 additions & 3 deletions src/bitmessagemain.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import helper_bootstrap
import helper_generic
import helper_threading
import std_io # for STDIO modes


def connectToStream(streamNumber):
Expand Down Expand Up @@ -205,8 +206,8 @@ def start(self):

try:
opts, args = getopt.getopt(
sys.argv[1:], "hcdt",
["help", "curses", "daemon", "test"])
sys.argv[1:], "hcdtn",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The option '-n' often has connotations of 'dry run' or 'numeric only'. While we don't have anything using 'n' right now I'm thinking of avoiding future confusion if we do. Is there another letter that you would say makes as much sense as 'n'?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought of that and decided that none of the common uses of "-n" (dry-run, no-resolve, no-output, numeric etc) are applicable in the Bitmessage context.
A second best option would be "-c" (netCat) which is already taken.

["help", "curses", "daemon", "test", "mode-netcat"])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other options are modes too. Perhaps you could drop the "mode-"?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See general comment, there's a naming theme.


except getopt.GetoptError:
self.usage()
Expand All @@ -224,6 +225,14 @@ def start(self):
elif opt in ("-t", "--test"):
state.testmode = daemon = True
state.enableGUI = False # run without a UI
elif opt in ("-n", "--mode-netcat"):
# STDIO MODES - reconfigure threads for netcat mode
state.enableNetwork = True # enable networking
state.enableObjProc = False # disable object processing
state.enableGUI = False # headless
state.enableSTDIO = True # enable STDIO
# STDIN to invQueue, STDOUT from objectProcessorQueue
std_io.stdInputMode = 'netcat'

# is the application already running? If yes then exit.
shared.thisapp = singleinstance("", daemon)
Expand Down Expand Up @@ -263,7 +272,12 @@ def start(self):
sqlLookup.start()

Inventory() # init
Dandelion() # init, needs to be early because other thread may access it early

# start network components if networking is enabled
if state.enableNetwork:
Dandelion() # init, needs to be early because other thread may access it early
else:
state.dandelion = 0

# Enable object processor and SMTP only if objproc enabled
if state.enableObjProc:
Expand All @@ -283,6 +297,12 @@ def start(self):
objectProcessorThread.daemon = False # DON'T close the main program even the thread remains. This thread checks the shutdown variable after processing each object.
objectProcessorThread.start()

elif state.enableSTDIO and std_io.stdInputMode == 'netcat':
# Start the thread that outputs objects to stdout in netcat mode
objectStdOutThread = std_io.objectStdOut()
objectStdOutThread.daemon = False # same as objectProcessorThread
objectStdOutThread.start()

# Start the cleanerThread
singleCleanerThread = singleCleaner()
singleCleanerThread.daemon = True # close the main program even if there are threads left
Expand All @@ -309,6 +329,12 @@ def start(self):
singleAPIThread.daemon = True # close the main program even if there are threads left
singleAPIThread.start()

# STDIO MODES - Start the STDIN thread
if state.enableSTDIO:
stdinThread = std_io.stdInput(sys.stdin)
stdinThread.daemon = True
stdinThread.start()

# start network components if networking is enabled
if state.enableNetwork:
BMConnectionPool()
Expand Down Expand Up @@ -440,7 +466,10 @@ def usage(self):
-h, --help show this help message and exit
-c, --curses use curses (text mode) interface
-d, --daemon run in daemon (background) mode

Advanced modes:
-t, --test dryrun, make testing
-n, --mode-netcat no GUI, read and write raw objects on STDIO

All parameters are optional.
'''
Expand Down
167 changes: 167 additions & 0 deletions src/std_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
"""
STDIO handling threads for netcat and airgap modes

stdInput thread: receives hex-encoded bitmessage objects on STDIN
Supported input formats, format is auto-detected:
- each line a hex-encoded object
- each line formatted: hex_timestamp - tab - hex-encoded_object
(the output format of netcat mode)

objectStdOut thread: replaces the objectProcessor thread in netcat mode,
outputs to STDOUT in format: hex_timestamp - tab - hex-encoded_object
"""

import threading
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maintain a order all the imports should be first.
from statements should be written after import statements.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

import time
import protocol
import queues
import state
import shutdown
import shared # statusIconColor

from struct import unpack
from binascii import hexlify, unhexlify
from addresses import decodeVarint, calculateInventoryHash
from debug import logger
from inventory import Inventory
from helper_sql import sqlQuery, sqlExecute, SqlBulkExecute

stdInputMode = 'netcat' # process STDIN in netcat mode by default
netcatExitOnEOF = True # exit program if EOF on STDIN


class stdInput(threading.Thread):
"""
Standard Input thread reads objects from STDIN, posts them to Inventory
"""

def __init__(self, inputSrc):
threading.Thread.__init__(self, name="stdInput")
self.inputSrc = inputSrc
logger.info('stdInput thread started.')

def run(self):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please write a method with only 15 or 20 lines.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. Go big or go home :)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Over longer term, I would also prefer to have shorter methods, some parts of the old code, like the class_objectProcessor are too long, but for this PR it's fine the way it is.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted with thanks. I think the ad-hoc object parsing accounts for a lot of avoidable LLOC wastage, however, as discussed in PR #1149 , there was no readily usable parser function to use instead.

while True:
# read a line in hex encoding
line = self.inputSrc.readline()
if not line:
logger.info("STDIN: End of input")
if netcatExitOnEOF:
shutdown.doCleanShutdown()
break

hexObject = line.rstrip()
hexTStamp = ''

# detect timestamp-tab-object format (as output by netcat mode)
if "\t" in hexObject:
hexTStamp = hexObject.split("\t")[0]
hexObject = hexObject.split("\t")[-1]

# unhex the input with error rejection
try:
binObject = unhexlify(hexObject)
except TypeError: # fix codacy warning
logger.info("STDIN: Invalid input format")
continue

# sanity check on object size
if len(binObject) < 22:
logger.info("STDIN: Invalid object size")
continue

if not state.enableNetwork and state.enableGUI:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You set state.enableGUI = False above. Will this conditional ever be reached?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

state.enableGUI is only false in netcat mode; std_io may have other uses outside of netcat mode, which are not currently included. Yes, the conditional makes sense from a logical perspective.

# in airgap mode, trick the status icon that we are in fact
# NOT waiting for a connection
# (may be removed after impact analysis)
shared.statusIconColor = 'yellow'

if stdInputMode == 'airgap':
# airgap mode uses the timestamp
# unhex the timestamp with error rejection
if len(hexTStamp) == 16:
try:
# stdioStamp, = unpack('>Q', unhexlify(hexTStamp))
_, = unpack('>Q', unhexlify(hexTStamp))
except (struct.error, TypeError): # fix codacy warning
logger.info("STDIN: Invalid timestamp format: " + hexTStamp)
continue

# check that proof of work is sufficient.
if not protocol.isProofOfWorkSufficient(binObject):
logger.info('STDIN: Insufficient Proof of Work')
continue

# extract expiry time, object type
eTime, = unpack('>Q', binObject[8:16])
objectType, = unpack('>I', binObject[16:20])

# extract version number and stream number
readPosition = 20 # bypass the nonce, time, and object type
# versionNumber, versionLength
_, versionLength = decodeVarint(binObject[readPosition:readPosition + 10])
readPosition += versionLength
streamNumber, streamNumberLength = decodeVarint(binObject[readPosition:readPosition + 10])
readPosition += streamNumberLength

# calculate inventory hash
inventoryHash = calculateInventoryHash(binObject)

# duplicate check on inventory hash (both netcat and airgap)
if inventoryHash in Inventory():
logger.debug("STDIN: Already got object " + hexlify(inventoryHash))
continue

# in netcat mode, push object to inventory and id to output queue
if stdInputMode == 'netcat':
Inventory()[inventoryHash] = (objectType, streamNumber, binObject, eTime, '')
logger.debug("STDIN: Accepted object (type=%u) " % objectType + hexlify(inventoryHash))
queues.invQueue.put((streamNumber, inventoryHash))

# honour global shutdown flag
if state.shutdown != 0:
logger.info('stdInput thread shutdown.')
break


class objectStdOut(threading.Thread):
"""
The objectStdOut thread receives network objects from the receiveDataThreads.
"""
def __init__(self):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow Pep8 styling format.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

threading.Thread.__init__(self, name="objectStdOut")

# REFACTOR THIS with objectProcessor into objectProcessorQueue
queryreturn = sqlQuery(
'''SELECT objecttype, data FROM objectprocessorqueue''')
for row in queryreturn:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By waiting until we have synchronously pulled all objects from the database and put all objects on the queue we are missing out on some benefits of parallelism and we'll hit a memory limit with large data sets. Maybe this is unavoidable or related to your suggested refactoring.

Also, keeping the number of places where raw SQL is used to a minimum is a good idea. Again, I assume this would be part of your suggested refactoring.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The block marked by refactor ... /refactor is duplicated verbatim from class objectProcessor; the comment indicates my intention to refactor it into class objectProcessorQueue, a task which is outside the scope of this PR.

objectType, data = row
queues.objectProcessorQueue.put((objectType, data))
sqlExecute('''DELETE FROM objectprocessorqueue''')
logger.debug('Loaded %s objects from disk into the objectProcessorQueue.' % str(len(queryreturn)))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pep8 styling missing

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

# /REFACTOR THIS

def run(self):
while True:
Copy link
Copy Markdown
Contributor

@coffeedogs coffeedogs May 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the queue is empty would this not cause unnecessary 100% CPU? Perhaps a small sleep is needed inside the while loop?

Edit: no it wouldn't, Queue.get(block=True) blocks when the queue is empty. I was thinking of the AMQP library I was most recently using.

objectType, data = queues.objectProcessorQueue.get()

# Output in documented format
print "%016x" % int(time.time()) + '\t' + hexlify(data)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use sys.stdout.write() instead of print here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be indeed the logical choice, I'm just not sure how cross-platform it would be. Print is 100% portable and not wrong.


if state.shutdown:
time.sleep(.5) # Wait just a moment for most of the connections to close

# REFACTOR THIS with objectProcessor into objectProcessorQueue
numberOfObjectsInObjProcQueue = 0
with SqlBulkExecute() as sql:
while queues.objectProcessorQueue.curSize > 0:
objectType, data = queues.objectProcessorQueue.get()
sql.execute('''INSERT INTO objectprocessorqueue VALUES (?,?)''',
objectType, data)
numberOfObjectsInObjProcQueue += 1
logger.debug('Saved %s objects from the objectProcessorQueue to disk. objectProcessorThread exiting.' %
str(numberOfObjectsInObjProcQueue))
# /REFACTOR THIS
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes please!


state.shutdown = 2
break