I received many questions on my tutorial Ingest tables in parallel with an Apache Spark notebook using multithreading. In this video and post I address some of the questions that I couldn’t just answer in the YouTube comments. Watch the video for more complete answers but here are quick responses with links to examples where appropriate.
Question 1: How do I pass more than one argument to the load_table function?
The load_table function is what I used in the initial video to take in a table name and do the processing for ingesting data to that table. Even if you use a different type of function, the idea is that you can put a more complex object on the queue. A simple option is to use a tuple. As an example, instead of passing table_name as a string you could create a tuple like (table_name, description) and add that to the queue. Then when you get the value off the queue, you can unpack the tuple either before calling load_table or within the load_table function.
Question 2: Can I run a notebook per table and run it concurrently?
Yes, you can use a notebook run instead load_table. Here is the syntax for doing with in Databricks or Azure Synapse notebooks. I also have the notebook being called return a value of “success” if it doesn’t encounter an error.
# Databricks function with notebook run
def load_table(args):
status = dbutils.notebook.run("stackoverflow_refined_table_load", 1800, arguments=args)
print(status)
if status != 'success':
raise Exception(f"Failed to load refined database. Status: {str(status)}")
# Synapse function with notebook run
def load_table(args):
status = mssparkutils.notebook.run("stackoverflow_refined_table_load", 1800, arguments=args)
if status != 'success':
raise Exception(f"Failed to load refined database. Status: {str(status)}")
Question 3: How do you handle errors in each thread?
First, handling errors in some way is important so I should have mentioned it in the initial video. If the thread errors, something needs to catch the error and make sure the task is marked as done. Without doing this the application will continue running indefinitely after a thread has been terminated due to error. Here is a snippet of the syntax to handle errors. See the video for more about how I save the errors and report them after all threads have completed.
def run_tasks(function, q):
while not q.empty():
try:
value = q.get()
function(value)
except Exception as e:
table = value.get("table", "UNKNOWN TABLE")
msg = f"Error processing table {table}: {str(e)}"
errors[value] = e
log_error_message(msg)
finally:
q.task_done()
The full notebook that reports errors on completion is available on my github.
Question 4: Is it using the executors (workers)?
It is using the workers where possible when calling the Spark API. It is still keeping activity on the driver for each thread, so if you are trying to do a lot of concurrent threads then you should increase the size of the driver. I show how to check the Spark UI for activity in the video.
Question 5: How do I do this with Scala?
Databricks has examples for this which can be found here. The key thing to understand is the Future object which is essentially a wrapper that indicates work may still need to be done before the next commands run on the object. This is a way to do asynchronous programming in Scala.
Final Thoughts
I am not recommending this approach more than a couple hundred tables, though if the files are large it still may be appropriate. For a large number of distinct tables I have used Rest API calls to trigger separate jobs but I still had to throttle the number of active jobs at some point or distribute across multiple clusters. As mentioned above, expanding the size of the Driver node is helpful if running many small Spark jobs concurrently on the same cluster.