- Home
- About Pixie
- Installing Pixie
- Using Pixie
- Tutorials
- Reference
Client(self, token: str, server_url: str = 'work.withpixie.ai', use_encryption: bool = False, channel_fn: Callable[[str], grpc.Channel] = None, conn_channel_fn: Callable[[str], grpc.aio._base_channel.Channel] = None )
Client is the main entry point to the Pixie API.
To setup the client, you need to generate an API token and pass it in as the first argument. See: https://docs.px.dev/using-pixie/api-quick-start/ for more info.
def connect_to_cluster(self, cluster: Union[str, pxapi.client.Cluster]) -> pxapi.client.Conn
Connect to a cluster.
Returns a connection object that you can use to create ScriptExecutor
s.
You may pass in a ClusterID
string or a Cluster
object that comes
from list_healthy_clusters()
.
def list_healthy_clusters(self) -> List[pxapi.client.Cluster]
Lists all of the healthy clusters that you can access.
Cluster(self, cluster_id: str, cluster_info: src.api.proto.cloudpb.cloudapi_pb2.ClusterInfo )
Cluster contains information users need about a specific cluster.
Mainly a convenience wrapper around the protobuf message so you can access the name in a simple format.
def name(self) -> str
Returns the name if that info exists, otherwise returns the id.
Conn(self, token: str, pixie_url: str, cluster_id: str, use_encryption: bool = True, cluster_info: src.api.proto.cloudpb.cloudapi_pb2.ClusterInfo = None, channel_fn: Callable[[str], grpc.aio._base_channel.Channel] = None )
The logical representation of a connection.
Holds the authorization information and handles the creation of an authorized gRPC channel.
def name(self) -> str
Get the name of the cluster for this connection.
def prepare_script(self, script_str: str) -> pxapi.client.ScriptExecutor
Create a new ScriptExecutor for the script to run on this connection.
ScriptExecutor(self, conn: pxapi.client.Conn, pxl: str, use_encryption: bool)
ScriptExecutor encapsulates the connection logic to Pixie instances.
If you want to get Pixie data, you will need to initialize ScriptExecutor
with
the clusters and PxL script then call results()
for the desired table name and
iterate the results.
Note: you can only invoke results()
, run()
,and run_async()
once on a
ScriptExecutor
object. If you need to exeucte a script multiple times,
you must create a new ScriptExecutor
object and setup any data processing
again. We rely on iterators that must close when a script stops running
and cannot allow multiple runs per object.
def add_callback(self, table_name: str, fn: Callable[[pxapi.data.Row], NoneType] ) -> None
Adds a callback fn that will be invoked on every row of table_name
as
they arrive.
Callbacks are not invoked until you call run()
(or run_async()
) on
the object.
If you add_callback
on a table not produced by the script, run()
(or run_async()
) will
raise a ValueError when the underlying gRPC channel closes.
The internals of ScriptExecutor
use the python async api and the callback fn
will be called concurrently while the ScriptExecutor is running. Note that callbacks
themselves should not be async functions.
Callbacks will block the rest of script execution so expensive and unending callbacks should not be used.
Raises:
ValueError: If called on a table that's already been passed as arg to
subscribe
or add_callback
.
ValueError: If called after run()
or run_async()
for a particular
ScriptExecutor
def results(self, table_name: str ) -> Generator[pxapi.data.Row, NoneType, NoneType]
Runs script and return results for the table.
Examples:
for row in script.results("http_table"):
print(row)
Raises:
ValueError: If table_name
is never sent during lifetime of script.
ValueError: If called after run()
or run_async()
for a particular
ScriptExecutor
.
def run(self) -> None
Executes the script synchronously.
Calls run_async()
but hides the asyncio details from users.
If any errors occur over the lifetime of any connection, this will raise an error.
Raises:
ValueError: If any callbacks are on tables that a ScriptExecutor
never receives.
ValueError: If called after run()
or run_async()
for a particular
ScriptExecutor
.
def run_async(self) -> None
Runs the script asynchronously using asyncio.
Same as run()
except you can directly control whether other tasks
should be run concurrently while the script is running.
Raises:
ValueError: If any callbacks are on tables that a ScriptExecutor
never receives.
ValueError: If called after run()
or run_async()
for a particular
ScriptExecutor
.
def subscribe(self, table_name: str) -> pxapi.client.TableSub
Returns an async generator that outputs rows for the table.
Raises:
ValueError: If called on a table that's already been passed as arg to
subscribe
or add_callback
.
ValueError: If called after run()
or run_async()
for a particular
ScriptExecutor
def subscribe_all_tables(self ) -> Callable[[], AsyncGenerator[pxapi.client.TableSub, NoneType]]
Returns an async generator that outputs table subscriptions as they arrive.
You can use this generator to call PxL scripts without knowing the tables
that are output beforehand. If you do know the tables beforehand, you should
subscribe
, add_callback
or even results
instead to prevent your api from
keeping data for tables that you don't use.
This generator will only start iterating after run_async()
has been
called. For the best performance, you will want to call the consumer of
the object returned by subscribe_all_tables
concurrently with run_async()
TableSub(self, name: str, table_gen: AsyncGenerator[Union[pxapi.data._TableStream, Literal['ERROR']], NoneType] )
TableSub is an async generator that yields rows for table.
You should avoid directly initializing TableSub objects. Instead, you
should create a ScriptExecutor object and ScriptExecutor.subscribe()
to a specific table or
ScriptExecutor.subscribe_all_tables()
. This avoids the complexity involved in creating this
object.
For more advanced users: the TableSub object is a promise that a table with the specified name
will be yielded by the table_gen
. If the table does not get yielded, the async generator will
throw an error when the table_gen
exits.
Row(self, table: pxapi.data._TableStream, data: List[Any])
Row represents a row of data for a particular table. You can easily access data in the row by using the column name from the associated table.
Specifically designed to avoid allocation memory for the relation for each row.
Examples:
tableA = Table("a", relation=(("cola",int), ("colb", int), ("colc", string))) row = Row(tableA, [1,2,"three"]) row["cola"] 1 row["colb"] 2 row["colc"] "three" row { "cola": 1, "colb": 2, "colc": "three" }
CryptoOptions(self)
def encrypt_options(self) -> src.api.proto.vizierpb.vizierapi_pb2.EncryptionOptions