-
Notifications
You must be signed in to change notification settings - Fork 573
implement netcat operating mode #1222
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: v0.6
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -66,6 +66,7 @@ | |
| import helper_bootstrap | ||
| import helper_generic | ||
| import helper_threading | ||
| import std_io # for STDIO modes | ||
|
|
||
|
|
||
| def connectToStream(streamNumber): | ||
|
|
@@ -205,8 +206,8 @@ def start(self): | |
|
|
||
| try: | ||
| opts, args = getopt.getopt( | ||
| sys.argv[1:], "hcdt", | ||
| ["help", "curses", "daemon", "test"]) | ||
| sys.argv[1:], "hcdtn", | ||
| ["help", "curses", "daemon", "test", "mode-netcat"]) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The other options are modes too. Perhaps you could drop the "mode-"?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See general comment, there's a naming theme. |
||
|
|
||
| except getopt.GetoptError: | ||
| self.usage() | ||
|
|
@@ -224,6 +225,14 @@ def start(self): | |
| elif opt in ("-t", "--test"): | ||
| state.testmode = daemon = True | ||
| state.enableGUI = False # run without a UI | ||
| elif opt in ("-n", "--mode-netcat"): | ||
| # STDIO MODES - reconfigure threads for netcat mode | ||
| state.enableNetwork = True # enable networking | ||
| state.enableObjProc = False # disable object processing | ||
| state.enableGUI = False # headless | ||
| state.enableSTDIO = True # enable STDIO | ||
| # STDIN to invQueue, STDOUT from objectProcessorQueue | ||
| std_io.stdInputMode = 'netcat' | ||
|
|
||
| # is the application already running? If yes then exit. | ||
| shared.thisapp = singleinstance("", daemon) | ||
|
|
@@ -263,7 +272,12 @@ def start(self): | |
| sqlLookup.start() | ||
|
|
||
| Inventory() # init | ||
| Dandelion() # init, needs to be early because other thread may access it early | ||
|
|
||
| # start network components if networking is enabled | ||
| if state.enableNetwork: | ||
| Dandelion() # init, needs to be early because other thread may access it early | ||
| else: | ||
| state.dandelion = 0 | ||
|
|
||
| # Enable object processor and SMTP only if objproc enabled | ||
| if state.enableObjProc: | ||
|
|
@@ -283,6 +297,12 @@ def start(self): | |
| objectProcessorThread.daemon = False # DON'T close the main program even the thread remains. This thread checks the shutdown variable after processing each object. | ||
| objectProcessorThread.start() | ||
|
|
||
| elif state.enableSTDIO and std_io.stdInputMode == 'netcat': | ||
| # Start the thread that outputs objects to stdout in netcat mode | ||
| objectStdOutThread = std_io.objectStdOut() | ||
| objectStdOutThread.daemon = False # same as objectProcessorThread | ||
| objectStdOutThread.start() | ||
|
|
||
| # Start the cleanerThread | ||
| singleCleanerThread = singleCleaner() | ||
| singleCleanerThread.daemon = True # close the main program even if there are threads left | ||
|
|
@@ -309,6 +329,12 @@ def start(self): | |
| singleAPIThread.daemon = True # close the main program even if there are threads left | ||
| singleAPIThread.start() | ||
|
|
||
| # STDIO MODES - Start the STDIN thread | ||
| if state.enableSTDIO: | ||
| stdinThread = std_io.stdInput(sys.stdin) | ||
| stdinThread.daemon = True | ||
| stdinThread.start() | ||
|
|
||
| # start network components if networking is enabled | ||
| if state.enableNetwork: | ||
| BMConnectionPool() | ||
|
|
@@ -440,7 +466,10 @@ def usage(self): | |
| -h, --help show this help message and exit | ||
| -c, --curses use curses (text mode) interface | ||
| -d, --daemon run in daemon (background) mode | ||
|
|
||
| Advanced modes: | ||
| -t, --test dryrun, make testing | ||
| -n, --mode-netcat no GUI, read and write raw objects on STDIO | ||
|
|
||
| All parameters are optional. | ||
| ''' | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,167 @@ | ||
| """ | ||
| STDIO handling threads for netcat and airgap modes | ||
|
|
||
| stdInput thread: receives hex-encoded bitmessage objects on STDIN | ||
| Supported input formats, format is auto-detected: | ||
| - each line a hex-encoded object | ||
| - each line formatted: hex_timestamp - tab - hex-encoded_object | ||
| (the output format of netcat mode) | ||
|
|
||
| objectStdOut thread: replaces the objectProcessor thread in netcat mode, | ||
| outputs to STDOUT in format: hex_timestamp - tab - hex-encoded_object | ||
| """ | ||
|
|
||
| import threading | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maintain a order all the imports should be first.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
| import time | ||
| import protocol | ||
| import queues | ||
| import state | ||
| import shutdown | ||
| import shared # statusIconColor | ||
|
|
||
| from struct import unpack | ||
| from binascii import hexlify, unhexlify | ||
| from addresses import decodeVarint, calculateInventoryHash | ||
| from debug import logger | ||
| from inventory import Inventory | ||
| from helper_sql import sqlQuery, sqlExecute, SqlBulkExecute | ||
|
|
||
| stdInputMode = 'netcat' # process STDIN in netcat mode by default | ||
| netcatExitOnEOF = True # exit program if EOF on STDIN | ||
|
|
||
|
|
||
| class stdInput(threading.Thread): | ||
| """ | ||
| Standard Input thread reads objects from STDIN, posts them to Inventory | ||
| """ | ||
|
|
||
| def __init__(self, inputSrc): | ||
| threading.Thread.__init__(self, name="stdInput") | ||
| self.inputSrc = inputSrc | ||
| logger.info('stdInput thread started.') | ||
|
|
||
| def run(self): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please write a method with only 15 or 20 lines.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No. Go big or go home :)
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Over longer term, I would also prefer to have shorter methods, some parts of the old code, like the class_objectProcessor are too long, but for this PR it's fine the way it is.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Noted with thanks. I think the ad-hoc object parsing accounts for a lot of avoidable LLOC wastage, however, as discussed in PR #1149 , there was no readily usable parser function to use instead. |
||
| while True: | ||
| # read a line in hex encoding | ||
| line = self.inputSrc.readline() | ||
| if not line: | ||
| logger.info("STDIN: End of input") | ||
| if netcatExitOnEOF: | ||
| shutdown.doCleanShutdown() | ||
| break | ||
|
|
||
| hexObject = line.rstrip() | ||
| hexTStamp = '' | ||
|
|
||
| # detect timestamp-tab-object format (as output by netcat mode) | ||
| if "\t" in hexObject: | ||
| hexTStamp = hexObject.split("\t")[0] | ||
| hexObject = hexObject.split("\t")[-1] | ||
|
|
||
| # unhex the input with error rejection | ||
| try: | ||
| binObject = unhexlify(hexObject) | ||
| except TypeError: # fix codacy warning | ||
| logger.info("STDIN: Invalid input format") | ||
| continue | ||
|
|
||
| # sanity check on object size | ||
| if len(binObject) < 22: | ||
| logger.info("STDIN: Invalid object size") | ||
| continue | ||
|
|
||
| if not state.enableNetwork and state.enableGUI: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You set
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. state.enableGUI is only false in netcat mode; std_io may have other uses outside of netcat mode, which are not currently included. Yes, the conditional makes sense from a logical perspective. |
||
| # in airgap mode, trick the status icon that we are in fact | ||
| # NOT waiting for a connection | ||
| # (may be removed after impact analysis) | ||
| shared.statusIconColor = 'yellow' | ||
|
|
||
| if stdInputMode == 'airgap': | ||
| # airgap mode uses the timestamp | ||
| # unhex the timestamp with error rejection | ||
| if len(hexTStamp) == 16: | ||
| try: | ||
| # stdioStamp, = unpack('>Q', unhexlify(hexTStamp)) | ||
| _, = unpack('>Q', unhexlify(hexTStamp)) | ||
| except (struct.error, TypeError): # fix codacy warning | ||
| logger.info("STDIN: Invalid timestamp format: " + hexTStamp) | ||
| continue | ||
|
|
||
| # check that proof of work is sufficient. | ||
| if not protocol.isProofOfWorkSufficient(binObject): | ||
| logger.info('STDIN: Insufficient Proof of Work') | ||
| continue | ||
|
|
||
| # extract expiry time, object type | ||
| eTime, = unpack('>Q', binObject[8:16]) | ||
| objectType, = unpack('>I', binObject[16:20]) | ||
|
|
||
| # extract version number and stream number | ||
| readPosition = 20 # bypass the nonce, time, and object type | ||
| # versionNumber, versionLength | ||
| _, versionLength = decodeVarint(binObject[readPosition:readPosition + 10]) | ||
| readPosition += versionLength | ||
| streamNumber, streamNumberLength = decodeVarint(binObject[readPosition:readPosition + 10]) | ||
| readPosition += streamNumberLength | ||
|
|
||
| # calculate inventory hash | ||
| inventoryHash = calculateInventoryHash(binObject) | ||
|
|
||
| # duplicate check on inventory hash (both netcat and airgap) | ||
| if inventoryHash in Inventory(): | ||
| logger.debug("STDIN: Already got object " + hexlify(inventoryHash)) | ||
| continue | ||
|
|
||
| # in netcat mode, push object to inventory and id to output queue | ||
| if stdInputMode == 'netcat': | ||
| Inventory()[inventoryHash] = (objectType, streamNumber, binObject, eTime, '') | ||
| logger.debug("STDIN: Accepted object (type=%u) " % objectType + hexlify(inventoryHash)) | ||
| queues.invQueue.put((streamNumber, inventoryHash)) | ||
|
|
||
| # honour global shutdown flag | ||
| if state.shutdown != 0: | ||
| logger.info('stdInput thread shutdown.') | ||
| break | ||
|
|
||
|
|
||
| class objectStdOut(threading.Thread): | ||
| """ | ||
| The objectStdOut thread receives network objects from the receiveDataThreads. | ||
| """ | ||
| def __init__(self): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Follow Pep8 styling format.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
| threading.Thread.__init__(self, name="objectStdOut") | ||
|
|
||
| # REFACTOR THIS with objectProcessor into objectProcessorQueue | ||
| queryreturn = sqlQuery( | ||
| '''SELECT objecttype, data FROM objectprocessorqueue''') | ||
| for row in queryreturn: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By waiting until we have synchronously pulled all objects from the database and put all objects on the queue we are missing out on some benefits of parallelism and we'll hit a memory limit with large data sets. Maybe this is unavoidable or related to your suggested refactoring. Also, keeping the number of places where raw SQL is used to a minimum is a good idea. Again, I assume this would be part of your suggested refactoring.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The block marked by |
||
| objectType, data = row | ||
| queues.objectProcessorQueue.put((objectType, data)) | ||
| sqlExecute('''DELETE FROM objectprocessorqueue''') | ||
| logger.debug('Loaded %s objects from disk into the objectProcessorQueue.' % str(len(queryreturn))) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pep8 styling missing
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
| # /REFACTOR THIS | ||
|
|
||
| def run(self): | ||
| while True: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When the queue is empty would this not cause unnecessary 100% CPU? Perhaps a small sleep is needed inside the while loop? Edit: no it wouldn't, Queue.get(block=True) blocks when the queue is empty. I was thinking of the AMQP library I was most recently using. |
||
| objectType, data = queues.objectProcessorQueue.get() | ||
|
|
||
| # Output in documented format | ||
| print "%016x" % int(time.time()) + '\t' + hexlify(data) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we use sys.stdout.write() instead of print here?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be indeed the logical choice, I'm just not sure how cross-platform it would be. Print is 100% portable and not wrong. |
||
|
|
||
| if state.shutdown: | ||
| time.sleep(.5) # Wait just a moment for most of the connections to close | ||
|
|
||
| # REFACTOR THIS with objectProcessor into objectProcessorQueue | ||
| numberOfObjectsInObjProcQueue = 0 | ||
| with SqlBulkExecute() as sql: | ||
| while queues.objectProcessorQueue.curSize > 0: | ||
| objectType, data = queues.objectProcessorQueue.get() | ||
| sql.execute('''INSERT INTO objectprocessorqueue VALUES (?,?)''', | ||
| objectType, data) | ||
| numberOfObjectsInObjProcQueue += 1 | ||
| logger.debug('Saved %s objects from the objectProcessorQueue to disk. objectProcessorThread exiting.' % | ||
| str(numberOfObjectsInObjProcQueue)) | ||
| # /REFACTOR THIS | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes please! |
||
|
|
||
| state.shutdown = 2 | ||
| break | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The option '-n' often has connotations of 'dry run' or 'numeric only'. While we don't have anything using 'n' right now I'm thinking of avoiding future confusion if we do. Is there another letter that you would say makes as much sense as 'n'?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought of that and decided that none of the common uses of "-n" (dry-run, no-resolve, no-output, numeric etc) are applicable in the Bitmessage context.
A second best option would be "-c" (netCat) which is already taken.