Cloud-Clientbibliotheken für Python verwenden

Diese Anleitung umfasst eine Schritt-für-Schritt-Anleitung für Cloud Shell. Darin werden Managed Service for Apache Spark gRPC APIs mithilfe von Google Cloud-Clientbibliotheken für Python programmatisch aufgerufen, um einen Cluster zu erstellen und einen Job an den Cluster zu senden.

In den folgenden Abschnitten wird die Verwendung des Anleitungscodes erläutert, der sich im GitHub-Repository GoogleCloudPlatform/python-dataproc befindet.

Schritt-für-Schritt-Anleitung für Cloud Shell ausführen

Klicken Sie zum Ausführen der Anleitung auf Open in Cloud Shell (In Google Cloud Shell öffnen).

In Cloud Shell öffnen

Den Code verstehen

In diesem Abschnitt wird erläutert, wie der Anleitungscode die Cloud-Clientbibliotheken für Python verwendet, um sich zu authentifizieren Google Cloud, einen Cluster zu erstellen, einen Spark-Job zu senden und den Cluster zu löschen.

Standardanmeldedaten für Anwendungen

Die Schritt-für-Schritt-Anleitung für Cloud Shell in diesem Tutorial bietet eine Authentifizierung anhand der Anmeldedaten Ihres Google Cloud Projekts. Wenn Sie Code lokal ausführen, sollten Sie zum Authentifizieren Ihres Codes die Dienstkonto-Anmeldedaten verwenden.

Managed Service for Apache Spark-Cluster erstellen

Die folgenden Werte werden festgelegt, um den Cluster zu erstellen:

  • Das Projekt, in dem der Cluster erstellt wird.
  • Die Region, in der der Cluster erstellt wird.
  • Der Name des Clusters.
  • Die Clusterkonfiguration, die einen Master und zwei primäre Worker angibt.

Für die übrigen Clustereinstellungen werden die Standardkonfigurationseinstellungen verwendet. Sie können die Standardkonfigurationseinstellungen für den Cluster überschreiben. Sie können beispielsweise sekundäre VMs hinzufügen (Standard = 0) oder ein nicht standardmäßiges VPC-Netzwerk für den Cluster angeben. Weitere Informationen finden Sie unter CreateCluster.

def quickstart(project_id, region, cluster_name, gcs_bucket, pyspark_file):
    # Create the cluster client.
    cluster_client = dataproc_v1.ClusterControllerClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    )

    # Create the cluster config.
    cluster = {
        "project_id": project_id,
        "cluster_name": cluster_name,
        "config": {
            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
        },
    }

    # Create the cluster.
    operation = cluster_client.create_cluster(
        request={"project_id": project_id, "region": region, "cluster": cluster}
    )
    result = operation.result()

    print(f"Cluster created successfully: {result.cluster_name}")

Job senden

Die folgenden Werte werden festgelegt, um den Job zu senden:

  • Das Projekt, in dem der Cluster erstellt wird.
  • Die Region, in der der Cluster erstellt wird.
  • Die Jobkonfiguration, die den Clusternamen und den Cloud Storage-Dateipfad (URI) des PySpark-Jobs angibt.

Weitere Informationen finden Sie unter SubmitJob.

# Create the job client.
job_client = dataproc_v1.JobControllerClient(
    client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)

# Create the job config.
job = {
    "placement": {"cluster_name": cluster_name},
    "pyspark_job": {"main_python_file_uri": f"gs://{gcs_bucket}/{spark_filename}"},
}

operation = job_client.submit_job_as_operation(
    request={"project_id": project_id, "region": region, "job": job}
)
response = operation.result()

# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

output = (
    storage.Client()
    .get_bucket(matches.group(1))
    .blob(f"{matches.group(2)}.000000000")
    .download_as_bytes()
    .decode("utf-8")
)

print(f"Job finished successfully: {output}\r\n")

Cluster löschen

Die folgenden Werte werden festgelegt, um den Cluster zu löschen:

  • Das Projekt, in dem der Cluster erstellt wird.
  • Die Region, in der der Cluster erstellt wird.
  • Der Name des Clusters.

Weitere Informationen finden Sie unter DeleteCluster.

# Delete the cluster once the job has terminated.
operation = cluster_client.delete_cluster(
    request={
        "project_id": project_id,
        "region": region,
        "cluster_name": cluster_name,
    }
)
operation.result()

print(f"Cluster {cluster_name} successfully deleted.")