spa_rk_db Flashcards
topandas
spark.toPandas()
from pandas
spark.createDataFrame(pandasdf, schema)
read and write
spark. read()
spark. write()
spark. read.jdbl(url=database jdbc url path, table=”tablename”)
spark. write.mode(“overwrite/append”).jdbl(url=database jdbc url path, table=”tablename”)
#csv write #save without changing the schema ??
show
spark.show(no of rows to show as integer)
filter based on condition
df. filter(“experiment_id = 1”).show()
df. filter((df.col1 == condition) & (df.col2 == condition))
df. filter((df.col1 == condition) & (df.col2 == condition)).select(colname)
replace values on condition
df.replace(old,new)
sparkdf select
sparkdf.select(“col name”)
select row
df = spark.createDataFrame([[1,2], [3,4]], [‘a’, ‘b’])
n=1 df.select(df.columns[n]).show() \+---+ | b| \+---+ | 2| | 4| \+---+
df.drop(df.columns[n]).show() \+---+ | a| \+---+ | 1| | 3| \+---+
df.select(df.columns[:n] + df.columns[n+1:]).show()
\+---+ | a| \+---+ | 1| | 3| \+---+
import pyspark types to construct schema
from pypark.sql.types import *
run sql query
spark.sql(“select * from table –limt 100”)
connect to an external database using pyodbc
import pyodbc
Driver=’{ODBC Driver 17 for SQL Server}’
Server= sql_server
Database=sql_db
cnxn = pyodbc.connect(‘DRIVER=’+Driver+’;SERVER=’+Server.split(“:”)[0]+’;DATABASE=’+Database+’;UID=’+sql_user+’;PWD=’+sql_pass)
cnxn = pyodbc.connect(‘DRIVER=’+Driver+’;SERVER=’+Server.split(“:”)[0]+’;DATABASE=’+Database+’;UID=’+sql_user+’;PWD=’+sql_pass)
cursor = cnxn.cursor()
cursor. execute(“update Experiment set experimentStatus = ‘Completed’ where experimentId = {}”.format(experiment_id))
cnxn. commit()
get secrets in databricks
dbutils.secrets.get(‘nlpexplorer-secret-scope ‘,’sql-username ‘)
first create the key:value pair in key vault
create secrets in databricks
https://docs.microsoft.com/en-us/azure/databricks/security/secrets/secret-scopes
databricks jdbc url
Obtain from database
curl comments for installing sql driver
%sh
curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
curl https://packages.microsoft.com/config/ubuntu/16.04/prod.list > /etc/apt/sources.list.d/mssql-release.list
sudo apt-get update
sudo ACCEPT_EULA=Y apt-get -q -y install msodbcsql17
change datatype in pyspark
df2 = df.withColumn(“age”,col(“age”).cast(StringType())
https://sparkbyexamples.com/pyspark/pyspark-cast-column-type/
change datatype in pyspark.
With column
with select
with select exp
with spark sql
// Convert String to Integer Type
df. withColumn(“salary”,col(“salary”).cast(IntegerType))
df. withColumn(“salary”,col(“salary”).cast(“int”))
df. withColumn(“salary”,col(“salary”).cast(“integer”))
// Using select df.select(col("salary").cast("int").as("salary"))
//Using selectExpr()
df. selectExpr(“cast(salary as int) salary”,”isGraduated”)
df. selectExpr(“INT(salary)”,”isGraduated”)
//Using with spark.sql()
spark. sql(“SELECT INT(salary),BOOLEAN(isGraduated),gender from CastExample”)
spark. sql(“SELECT cast(salary as int) salary, BOOLEAN(isGraduated),gender from CastExample”)
databricks dbutils to get parameters
dbutils.widgets.text(“ExperimentId”,”DefaultName”)
experiment_id = dbutils.widgets.get(“ExperimentId”)
call db notebook from another notebook
dbutils.notebook.run(“notebook name”,timeout, {})
send results out of databricks after run
dbutils.notebook.exit(“value”)
import json
dbutils.notebook.exit(json({key:value}))
add arbitrary column to spark
from pyspark.sql.functions import lit
df_with_x4 = df.withColumn(“x4”, lit(0))
df_with_x4.show()
get no of rows
sparkdf.count()
get column
sparkdf.column
turn databricks notebook cell to run sql commands
%sql
sql query
#http requests Using job id
- create a job in databricks using jobs
- copy job id
- in postman create post request
- add authorisation token (get it from user settings)
- . add API request
https: //adb-3184120056443208.8.azuredatabricks.net/api/2.0/jobs/run-now - set body to json
- add json strings
{ "job_id":5670, "notebook_params":{ "lob": "4", "dataSource": "CAP", "customerGeos": "107,1", "textFields": "9", "channel": "13", "cssProducts": "16", "issueCodes": "37056"}, "notebook_task": {"notebook_path": "/Nlp_explorer/Casecount" }}
#http requests Using run id
https://adb-3184120056443208.8.azuredatabricks.net/api/2.0/jobs/runs/submit
{ "existing_cluster_id": "0510-115652-stop848", "notebook_params":{ "lob": "4", "dataSource": "CAP", "customerGeos": "107,1", "textFields": "9", "channel": "13", "cssProducts": "16", "issueCodes": "37056"}, "notebook_task": {"notebook_path": "/Nlp_explorer/Casecount" }}
get for run id
https://adb-3184120056443208.8.azuredatabricks.net/api/2.0/get
{
“run_id”: 5670
}
create empty dataframe
spark.createDataFrame([],[“col1”, “col2”])
how to check if df is empty
df.rdd.isEmpty()
read csv with header
medium article has more on read and write with jsons also
spark.read.format(“csv).option(“header”.”true”).load(filepath)
overwrite table without changing schema
trans something………… forgot
storing the password or key in Azure Key Vault
storing the password or key in Azure Key Vault as a secret instead in a notebook (SQL query).
dbutils.secrets.get(scope = “bitools_secrets”, key = “blobkey”)